Discussion:
3 commits - createrepo/__init__.py createrepo/utils.py worker.py
z***@osuosl.org
2012-11-29 14:48:11 UTC
Permalink
createrepo/__init__.py | 89 ++++++++++++++++++++++++++-----------------------
createrepo/utils.py | 16 ++------
worker.py | 39 +++++++++++----------
3 files changed, 74 insertions(+), 70 deletions(-)

New commits:
commit 07e3380a30ec7375aae1a03ef49337eab250f4b2
Author: Zdeněk Pavlas <***@redhat.com>
Date: Fri Nov 23 15:53:07 2012 +0100

save metadata in pkg order

diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index 75710d9..a52ec9e 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -543,6 +543,7 @@ class MetaDataGenerator:
# go on their merry way

newpkgs = []
+ keptpkgs = []
if self.conf.update:
# if we're in --update mode then only act on the new/changed pkgs
for pkg in pkglist:
@@ -558,11 +559,7 @@ class MetaDataGenerator:
if self.conf.verbose:
self.callback.log(_("Using data from old metadata for %s")
% pkg)
-
- old_po.basepath = self.conf.baseurl # reset baseurl in the old pkg
- self.primaryfile.write(old_po.xml_dump_primary_metadata())
- self.flfile.write(old_po.xml_dump_filelists_metadata())
- self.otherfile.write(old_po.xml_dump_other_metadata())
+ keptpkgs.append((pkg, old_po))

#FIXME - if we're in update and we have deltas enabled
# check the presto data for this pkg and write its info back out
@@ -594,15 +591,25 @@ class MetaDataGenerator:
self.read_pkgs.append(pkg)

if po:
- self.primaryfile.write(po.xml_dump_primary_metadata())
- self.flfile.write(po.xml_dump_filelists_metadata())
- self.otherfile.write(po.xml_dump_other_metadata(
- clog_limit=self.conf.changelog_limit))
+ keptpkgs.append((pkg, po))
continue

pkgfiles.append(pkg)
-
-
+
+ keptpkgs.sort(reverse=True)
+ # keptkgs is a list of (filename, po), pkgfiles is a list if filenames.
+ # Need to write them in sorted(filename) order. We loop over pkgfiles,
+ # inserting keptpkgs in right spots (using the upto argument).
+ def save_keptpkgs(upto):
+ while keptpkgs and (upto is None or keptpkgs[-1][0] < upto):
+ filename, po = keptpkgs.pop()
+ # reset baseurl in the old pkg
+ po.basepath = self.conf.baseurl
+ self.primaryfile.write(po.xml_dump_primary_metadata())
+ self.flfile.write(po.xml_dump_filelists_metadata())
+ self.otherfile.write(po.xml_dump_other_metadata(
+ clog_limit=self.conf.changelog_limit))
+
if pkgfiles:
# divide that list by the number of workers and fork off that many
# workers to tmpdirs
@@ -612,6 +619,7 @@ class MetaDataGenerator:
self._worker_tmp_path = tempfile.mkdtemp() # setting this in the base object so we can clean it up later
if self.conf.workers < 1:
self.conf.workers = num_cpus_online()
+ pkgfiles.sort()
worker_chunks = split_list_into_equal_chunks(pkgfiles, self.conf.workers)
worker_cmd_dict = {}
worker_jobs = {}
@@ -671,6 +679,9 @@ class MetaDataGenerator:
self.callback.errorlog('Worker %s: %s' % (num, line.rstrip()))

for i, pkg in enumerate(pkgfiles):
+ # insert cached packages
+ save_keptpkgs(pkg)
+
# save output to local files
log_messages(i % self.conf.workers)

@@ -696,6 +707,7 @@ class MetaDataGenerator:
continue
self.read_pkgs.append(pkgfile)

+ save_keptpkgs(None) # append anything left
return self.current_pkg


commit f0574bbf35ef6d173c662c6093ed6c34dec515bc
Author: Zdeněk Pavlas <***@redhat.com>
Date: Fri Nov 23 14:05:02 2012 +0100

use pipe instead of tempfiles.. ~5% speedup

diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index b3a3c06..75710d9 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -635,20 +635,13 @@ class MetaDataGenerator:
f.write('\n'.join(worker_chunks[worker_num]))
f.close()

- # make the worker directory
workercmdline = []
workercmdline.extend(base_worker_cmdline)
- thisdir = self._worker_tmp_path + '/' + str(worker_num)
- if checkAndMakeDir(thisdir):
- workercmdline.append('--tmpmdpath=%s' % thisdir)
- else:
- raise MDError, "Unable to create worker path: %s" % thisdir
workercmdline.append('--pkglist=%s/pkglist-%s' % (self._worker_tmp_path, worker_num))
worker_cmd_dict[worker_num] = workercmdline



- fds = {}
for (num, cmdline) in worker_cmd_dict.items():
if not self.conf.quiet:
self.callback.log("Spawning worker %s with %s pkgs" % (num,
@@ -656,19 +649,35 @@ class MetaDataGenerator:
job = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
worker_jobs[num] = job
- fds[job.stdout.fileno()] = num, job.stdout, self.callback.log
- fds[job.stderr.fileno()] = num, job.stderr, self.callback.errorlog

- while fds:
- for fd in select(fds, [], [])[0]:
- num, stream, logger = fds[fd]
- line = stream.readline()
- if line == '':
- del fds[fd]
- continue
- logger('Worker %s: %s' % (num, line.rstrip()))
+ files = self.primaryfile, self.flfile, self.otherfile
+ def log_messages(num):
+ job = worker_jobs[num]
+ while True:
+ # check stdout and stderr
+ for stream in select((job.stdout, job.stderr), (), ())[0]:
+ line = stream.readline()
+ if line: break
+ else:
+ return # EOF, EOF
+ if stream is job.stdout:
+ if line.startswith('*** '):
+ # get data, save to local files
+ for out, size in zip(files, line[4:].split()):
+ out.write(stream.read(int(size)))
+ return
+ self.callback.log('Worker %s: %s' % (num, line.rstrip()))
+ else:
+ self.callback.errorlog('Worker %s: %s' % (num, line.rstrip()))
+
+ for i, pkg in enumerate(pkgfiles):
+ # save output to local files
+ log_messages(i % self.conf.workers)

for (num, job) in worker_jobs.items():
+ # process remaining messages on stderr
+ log_messages(num)
+
if job.wait() != 0:
msg = "Worker exited with non-zero value: %s. Fatal." % job.returncode
self.callback.errorlog(msg)
@@ -676,18 +685,6 @@ class MetaDataGenerator:

if not self.conf.quiet:
self.callback.log("Workers Finished")
- # finished with workers
- # go to their dirs and add the contents
- if not self.conf.quiet:
- self.callback.log("Gathering worker results")
- for num in range(self.conf.workers):
- for (fn, fo) in (('primary.xml', self.primaryfile),
- ('filelists.xml', self.flfile),
- ('other.xml', self.otherfile)):
- fnpath = self._worker_tmp_path + '/' + str(num) + '/' + fn
- if os.path.exists(fnpath):
- fo.write(open(fnpath, 'r').read())
-

for pkgfile in pkgfiles:
if self.conf.deltas:
diff --git a/createrepo/utils.py b/createrepo/utils.py
index cfe68e6..b0d92ec 100644
--- a/createrepo/utils.py
+++ b/createrepo/utils.py
@@ -191,18 +191,10 @@ def encodefiletypelist(filetypelist):
return result

def split_list_into_equal_chunks(seq, num_chunks):
- if num_chunks <= 1:
- return [seq[:]]
- avg = len(seq) / float(num_chunks)
- out = []
- last = 0.0
- # Due to floating point math, we do one less than the number of chunks
- # and then the rest. Eg. range(1,6), 9
- while len(out) < (num_chunks - 1):
- out.append(seq[int(last):int(last + avg)])
- last += avg
- out.append(seq[int(last):])
-
+ """it's used on sorted input which is then merged in order"""
+ out = [[] for i in range(num_chunks)]
+ for i, item in enumerate(seq):
+ out[i % num_chunks].append(item)
return out

def num_cpus_online(unknown=1):
commit 3eba159db3690a516ad389a7e4e52fa2167c404c
Author: Zdeněk Pavlas <***@redhat.com>
Date: Fri Nov 23 14:43:41 2012 +0100

worker: no --tmpmdpath => use stdout

diff --git a/worker.py b/worker.py
index 23c87a3..fe6758f 100755
--- a/worker.py
+++ b/worker.py
@@ -39,10 +39,6 @@ def main(args):
opts, pkgs = parser.parse_args(args)
external_data = {'_packagenumber': 1}
globalopts = {}
- if not opts.tmpmdpath:
- print >> sys.stderr, "tmpmdpath required for destination files"
- sys.exit(1)
-

for strs in opts.pkgoptions:
k,v = strs.split('=')
@@ -67,10 +63,19 @@ def main(args):

reldir = external_data['_reldir']
ts = rpmUtils.transaction.initReadOnlyTransaction()
- pri = open(opts.tmpmdpath + '/primary.xml' , 'w')
- fl = open(opts.tmpmdpath + '/filelists.xml' , 'w')
- other = open(opts.tmpmdpath + '/other.xml' , 'w')
-
+ if opts.tmpmdpath:
+ files = [open(opts.tmpmdpath + '/%s.xml' % i, 'w')
+ for i in ('primary', 'filelists', 'other')]
+ def output(*xml):
+ for fh, buf in zip(files, xml):
+ fh.write(buf)
+ else:
+ def output(*xml):
+ buf = ' '.join(str(len(i)) for i in xml)
+ sys.stdout.write('*** %s\n' % buf)
+ for buf in xml:
+ sys.stdout.write(buf)
+
if opts.pkglist:
for line in open(opts.pkglist,'r').readlines():
line = line.strip()
@@ -78,10 +83,14 @@ def main(args):
continue
pkgs.append(line)

+ clog_limit=globalopts.get('clog_limit', None)
+ if clog_limit is not None:
+ clog_limit = int(clog_limit)
for pkgfile in pkgs:
pkgpath = reldir + '/' + pkgfile
if not os.path.exists(pkgpath):
print >> sys.stderr, "File not found: %s" % pkgpath
+ output()
continue

try:
@@ -91,21 +100,15 @@ def main(args):
pkg = createrepo.yumbased.CreateRepoPackage(ts, package=pkgpath,
sumtype=globalopts.get('sumtype', None),
external_data=external_data)
- pri.write(pkg.xml_dump_primary_metadata())
- fl.write(pkg.xml_dump_filelists_metadata())
- clog_limit=globalopts.get('clog_limit', None)
- if clog_limit is not None:
- clog_limit = int(clog_limit)
- other.write(pkg.xml_dump_other_metadata(clog_limit=clog_limit))
+ output(pkg.xml_dump_primary_metadata(),
+ pkg.xml_dump_filelists_metadata(),
+ pkg.xml_dump_other_metadata(clog_limit=clog_limit))
except yum.Errors.YumBaseError, e:
print >> sys.stderr, "Error: %s" % e
+ output()
continue
else:
external_data['_packagenumber']+=1

- pri.close()
- fl.close()
- other.close()
-
if __name__ == "__main__":
main(sys.argv[1:])

Loading...