svn commit: r220976 - in projects/portbuild: conf qmanager scripts
Florent Thoumie
flz at FreeBSD.org
Sat Apr 23 21:02:26 UTC 2011
Author: flz
Date: Sat Apr 23 21:02:25 2011
New Revision: 220976
URL: http://svn.freebsd.org/changeset/base/220976
Log:
Import qmanager into portbuild.
Added:
projects/portbuild/qmanager/
projects/portbuild/qmanager/acl.py
projects/portbuild/qmanager/dumpdb.py (contents, props changed)
projects/portbuild/qmanager/packagebuild (contents, props changed)
projects/portbuild/qmanager/qclient (contents, props changed)
projects/portbuild/qmanager/qmanager (contents, props changed)
projects/portbuild/qmanager/qmanager.py (contents, props changed)
projects/portbuild/qmanager/qmanagerclient.py
projects/portbuild/qmanager/qmanagerhandler.py
projects/portbuild/qmanager/qmanagerobj.py
projects/portbuild/qmanager/schema.sql
Modified:
projects/portbuild/conf/server.conf
projects/portbuild/scripts/dopackages
Modified: projects/portbuild/conf/server.conf
==============================================================================
--- projects/portbuild/conf/server.conf Sat Apr 23 20:59:58 2011 (r220975)
+++ projects/portbuild/conf/server.conf Sat Apr 23 21:02:25 2011 (r220976)
@@ -72,7 +72,6 @@ PDISPATCH_TIMEOUT=360000
# qmanager definitions (note: Python script, so avoid {})
#
-QMANAGER_PATH=/var/portbuild/evil/qmanager
QMANAGER_DATABASE_FILE=qdb.sl3
QMANAGER_SOCKET_FILE=/tmp/.qmgr
Added: projects/portbuild/qmanager/acl.py
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ projects/portbuild/qmanager/acl.py Sat Apr 23 21:02:25 2011 (r220976)
@@ -0,0 +1,156 @@
+# Validate a (uid, (gids)) tuple against an ACL.
+
+import pwd, grp
+
+def getuidbyname(username):
+ if str(username).isdigit():
+ return int(username)
+ return pwd.getpwnam(username)[2]
+
+def getgidbyname(grname):
+ if str(grname).isdigit():
+ return int(grname)
+ return grp.getgrnam(grname)[2]
+
+class ACLElement(object):
+ """ Component of an ACL. """
+
+ def __init__(self, name, uidlist, gidlist, sense):
+ self.name = name
+ self.uidlist = [getuidbyname(uid) for uid in uidlist]
+ self.gidlist = [getgidbyname(gid) for gid in gidlist]
+ self.sense = bool(sense)
+
+ def validate(self, uid, gids):
+ """ Validate an ACL Element. In order to match, the following must
+ hold:
+
+ * uid is a subset of self.uidlist, or self.uidlist is empty
+ * one of the gids must be present in self.gidlist, or
+ self.gidlist is empty
+
+ If both conditions hold, then the validation returns self.sense
+
+ Returns: True/False if Element matches
+ None if Element fails to match
+ """
+
+ if (len(self.uidlist) == 0 or uid in self.uidlist) and \
+ (len(self.gidlist) == 0 or set(gids).intersection(self.gidlist)):
+ return self.sense
+ return None
+
+class ACL(object):
+ """ List of ACLElements that form an ACL """
+
+ def __init__(self, acllist):
+ self.acls = acllist
+
+ def validate(self, uid, gids):
+ uid=getuidbyname(uid)
+ gids=set(getgidbyname(gid) for gid in gids)
+
+ for acl in self.acls:
+ res=acl.validate(uid, gids)
+ if res is not None:
+ return res
+ return False
+
+if __name__ == "__main__":
+
+ from sys import exit
+
+ assert getuidbyname(123) == 123
+ assert getuidbyname('123') == 123
+
+ try:
+ ACLElement("test", ["foobar"], [""], True)
+ except KeyError:
+ pass
+
+ try:
+ ACLElement("test", [123, "foobar"], [""], True)
+ except KeyError:
+ pass
+
+ assert ACLElement("test", [123], [], True) != None
+ assert ACLElement("test", ["123"], [], True) != None
+
+ acl = ACL([ACLElement("el 1", ["kris"], [], True),
+ ACLElement("el 2", [], ["wheel"], True),
+ ACLElement("el 3", [], [], False)])
+
+ assert acl.validate(getuidbyname('kris'), []) == True
+ assert acl.validate(getuidbyname('simon'), []) == False
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('root'), [pwd.getpwnam('root')[3]]) == True
+
+ acl = ACL([ACLElement("el 1", ["kris"], ["distcc"], True),
+ ACLElement("el 2", [], ["wheel"], True),
+ ACLElement("el 3", [], [], False)])
+ assert acl.validate("kris", ["wheel"]) == True
+ assert acl.validate("kris", ["staff"]) == False
+
+ acl = ACL([ACLElement("", ('kris',), (), True),
+ ACLElement("", (), ('wheel', 'devel'), True),
+ ACLElement("", (), (), False)])
+
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+ acl = ACL([ACLElement("", ('kris',), (), True),
+ ACLElement("", (), ('devel',), True),
+ ACLElement("", (), ('wheel',), True),
+ ACLElement("", (), (), False)])
+
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+ acl = ACL([ACLElement("", ('kris',), (), True),
+ ACLElement("", (), ('devel',), False),
+ ACLElement("", (), ('wheel',), True),
+ ACLElement("", (), (), False)])
+
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == False
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+
+ acl = ACL([ACLElement("", ('kris',), (), True),
+ ACLElement("", (), ('devel',), True),
+ ACLElement("", (), ('wheel',), False),
+ ACLElement("", (), (), False)])
+
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+
+ acl = ACL([ACLElement("", ('kris',), (), True),
+ ACLElement("", (), ('devel',), True),
+ ACLElement("", (), ('wheel',), False),
+ ACLElement("", (), (), True)])
+
+ assert acl.validate(getuidbyname('simon'), []) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == True
+
+ acl = ACL([ACLElement("", ('kris',), (), False),
+ ACLElement("", (), ('devel',), True),
+ ACLElement("", (), ('wheel',), False),
+ ACLElement("", (), (), True)])
+
+ assert acl.validate(getuidbyname('simon'), []) == True
+ assert acl.validate(getuidbyname('kris'), []) == False
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False
+ assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == True
+
+ acl = ACL([ACLElement("", (4206,), set([]), True),
+ ACLElement("", (), set([]), False)])
+
+ assert acl.validate(4206, (4206, 31337)) == True
+ assert acl.validate(4201, (4201, 31337)) == False
Added: projects/portbuild/qmanager/dumpdb.py
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ projects/portbuild/qmanager/dumpdb.py Sat Apr 23 21:02:25 2011 (r220976)
@@ -0,0 +1,140 @@
+#
+# try doing some SQL reads as a test
+#
+from freebsd_config import *
+
+import os, threading, socket, Queue
+
+from signal import *
+from sys import exc_info
+from itertools import chain
+
+from qmanagerobj import *
+
+CONFIG_DIR="/var/portbuild"
+CONFIG_SUBDIR="conf"
+CONFIG_FILENAME="server.conf"
+
+# pieces of qmanagerobj.startup
+def obj_startup(filename):
+
+ engine = create_engine('sqlite:///' + filename, echo=True)
+ Session = sessionmaker(bind=engine)
+ session = Session()
+
+ Base.metadata.create_all(engine)
+
+ return (engine, session)
+
+
+def show_acl( session ):
+
+ acls = session.query(QManagerACL)
+ acls = acls.order_by('name')
+
+ print
+ print 'starting dump of acl table:'
+ print
+
+ for acl in acls:
+
+ print
+ print "name: %s" % acl.name
+ # list
+ print "uidlist: " + str( acl.uidlist )
+ # list
+ print "gidlist: " + str( acl.gidlist )
+ print "sense: " + str( acl.sense )
+
+
+def show_jobs( session ):
+
+ jobs = session.query(Job)
+ jobs = jobs.order_by('id')
+
+ print
+ print 'starting dump of Job table:'
+ print
+
+ for job in jobs:
+
+ print
+ # job ID
+ print "job id: " + `job.id`
+ # Name of job
+ print "name: " + job.name
+ # priority of request
+ print "priority: " + `job.priority`
+ # job type
+ print "type: " + job.type
+ # uid of job owner
+ print "owner: " + `job.owner`
+ # gids of job owner (tuple)
+ #print str( type( job.gids ) )
+ print "gids: " + str( job.gids )
+ # machines that satisfied initial query (list)
+ #print str( type( job.machines ) )
+ print "machines: " + str( job.machines )
+ # Time job started/blocked (must not be modified when job is
+ # blocked or it will corrupt the heapq)
+ print "startttime: " + `job.starttime`
+ # initial machine description in case we have to revalidate (list)
+ # print str( type( job.mdl ) )
+ print "mdl: " + str( job.mdl )
+ # True --> job is running; False --> job is blocked
+ print "running: " + str( job.running )
+
+
+def show_machines( session ):
+
+ machines = session.query(Machine)
+ machines = machines.order_by('name')
+
+ print
+ print 'starting dump of Machines table:'
+ print
+
+ for machine in machines:
+
+ print
+ print "name: %s" % machine.name
+ # list
+ print "acl: " + str( machine.acl )
+ # boolean
+ print "haszfs: " + str( machine.haszfs )
+ # boolean
+ print "online: " + str( machine.online )
+
+
+def show_machines_for_arch( engine, arch ):
+
+ mdl = ["arch = %s" % arch]
+
+ q = SQL.construct(Machine, mdl)
+ res = engine.execute(Machine.__table__.select(q))
+ result = [SQL.to_dict(Machine, i) for i in res]
+
+ print
+ for machine in result:
+ print "machine for %s : %s " % ( arch, machine[ 'name' ] )
+
+
+# main
+
+if __name__ == '__main__':
+
+ print "acquiring engine and session"
+ config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME )
+ QMANAGER_PATH = config.get( 'QMANAGER_PATH' )
+ QMANAGER_DATABASE_FILE = config.get( 'QMANAGER_DATABASE_FILE' )
+ (engine, session) = obj_startup( \
+ os.path.join( QMANAGER_PATH, QMANAGER_DATABASE_FILE ) )
+ print "acquired engine and session"
+ # print "engine = '" + str( engine ) + "', session = '" + str( session ) + "'"
+
+ show_acl( session )
+ show_machines( session )
+ show_jobs( session )
+
+ show_machines_for_arch( engine, 'i386' )
+ show_machines_for_arch( engine, 'amd64' )
Added: projects/portbuild/qmanager/packagebuild
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ projects/portbuild/qmanager/packagebuild Sat Apr 23 21:02:25 2011 (r220976)
@@ -0,0 +1,649 @@
+#!/usr/bin/env python
+
+# Improved build dispatcher. Invoked on server-side from dopackages.
+
+# We try to build leaf packages (those
+# which can be built immediately without requiring additional
+# dependencies to be built) in the order such that the ones required
+# by the longest dependency chains are built first.
+#
+# This has the effect of favouring deep parts of the package tree and
+# evening out the depth over time, hopefully avoiding the situation
+# where the entire cluster waits for a deep part of the tree to
+# build on a small number of machines
+#
+# We can dynamically respond to changes in build machine availability,
+# since the queue manager will block jobs that cannot be immediately
+# satisfied and will unblock us when a job slot becomes available.
+#
+# When a package build fails, it is requeued with a lower priority
+# such that it will rebuild again as soon as no "phase 1" packages
+# are available to build. This prevents the cluster staying idle
+# until the last phase 1 package builds.
+#
+# Other advantages are that this system is easily customizable and in
+# the future will let us customize things like the matching policy of
+# jobs to machines. For example, we could avoid dispatching multiple
+# openoffice builds to the same system.
+#
+# TODO:
+# * Combine build prep stages?
+# - initial check for file up-to-date
+# * check mtime for package staleness (cf make)
+# * option to skip phase 2
+
+from qmanagerclient import *
+
+from freebsd_config import *
+
+import os, string, sys, threading, time, subprocess
+#import random
+from itertools import chain
+#import gc
+from stat import *
+
+from Queue import Queue
+from heapq import *
+
+CONFIG_DIR="/var/portbuild"
+CONFIG_SUBDIR="conf"
+CONFIG_FILENAME="server.conf"
+
+config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME )
+QMANAGER_MAX_JOB_ATTEMPTS = int( \
+ config.get( 'QMANAGER_MAX_JOB_ATTEMPTS' ) )
+QMANAGER_PRIORITY_PACKAGES = string.split( \
+ config.get( 'QMANAGER_PRIORITY_PACKAGES' ) )
+QMANAGER_RUNAWAY_PERCENTAGE = float( \
+ config.get( 'QMANAGER_RUNAWAY_PERCENTAGE' ) )
+QMANAGER_RUNAWAY_THRESHOLD = int( \
+ config.get( 'QMANAGER_RUNAWAY_THRESHOLD' ) )
+
+DEBUG = False
+
+categories = {}
+ports = {}
+
+# When a build fails we requeue it with a lower priority such that it
+# will never preempt a phase 1 build but will build when spare
+# capacity is available.
+PHASE2_BASE_PRIO=1000
+
+# Process success quickly so other jobs are started
+SUCCESS_PRIO = -1000
+
+# Failure should be a less common event :)
+FAILURE_PRIO = -900
+
+# Port status codes
+PENDING = 1 # Yet to build
+PHASE2 = 2 # Failed once
+
+class PriorityQueue(Queue):
+ """Variant of Queue that retrieves open entries in
+ priority order (lowest first).
+ Entries are typically tuples of the form: (priority number,
+ data)
+ This class can be found at: Python-2.6a3/Lib/Queue.py
+ """
+ maxsize = 0
+
+ def _init(self, maxsize):
+ self.queue = []
+
+ def _qsize(self, len=len):
+ return len(self.queue)
+
+ def _put(self, item, heappush=heappush):
+ heappush(self.queue, item)
+
+ def _get(self, heappop=heappop):
+ return heappop(self.queue)
+
+class Index(object):
+
+ def __init__(self, indexfile):
+ self.indexfile = indexfile
+
+ def parse(self, targets = None):
+
+ print "[MASTER] Read index"
+ f = file(self.indexfile)
+ index = f.readlines()
+ f.close()
+ f = None
+ del f
+
+ lines=[]
+ print "[MASTER] Phase 1"
+ for i in index:
+ (name, path, prefix, comment, descr, maintainer, categories, bdep,
+ rdep, www, edep, pdep, fdep) = i.rstrip().split("|")
+
+ if targets is None or name in targets:
+ lines.append((name, bdep, rdep, edep, pdep, fdep))
+
+ Port(name, path, "", "", "", "",
+ categories, "")
+ index = None
+ del index
+
+ print "[MASTER] Phase 2"
+ for (name, bdep, rdep, edep, pdep, fdep) in lines:
+ ports[name].setdeps(bdep, rdep, edep, pdep, fdep)
+
+ lines = None
+ del lines
+ print "[MASTER] Done"
+
+def depthindex(targets):
+ """ Initial population of depth tree """
+
+ for i in targets:
+ i.depth_recursive()
+
+class Port(object):
+
+ def __init__(self, name, path, prefix, comment, descr, maintainer,
+ cats, www):
+
+ __slots__ = ["name", "path", "prefix", "comment", "descr",
+ "maintainer", "www", "bdep", "rdep", "edep", "pdep",
+ "fdep", "alldep", "parents", "depth", "categories"]
+
+ self.name = name
+ self.path = path
+ self.prefix = prefix
+ self.comment = comment
+ self.descr = descr
+ self.maintainer = maintainer
+ self.www = www
+
+ # Populated later
+ self.bdep = []
+ self.rdep = []
+ self.edep = []
+ self.pdep = []
+ self.fdep = []
+
+ self.alldep = []
+ self.parents = []
+ self.id = None # XXX
+
+ self.status = PENDING
+ self.attempts = 0
+
+ # Whether the package build has completed and is hanging around
+ # to resolve dependencies for others XXX use status
+ self.done = False
+
+ # Depth is the maximum length of the dependency chain of this port
+ self.depth = None
+
+ self.categories=[]
+ scats = cats.split()
+ if len(scats) != len(set(scats)):
+ print "[MASTER] Warning: port %s includes duplicated categories: %s" % (name, cats)
+
+ for c in set(scats):
+ try:
+ cat = categories[c]
+ except KeyError:
+ cat = Category(c)
+
+ self.categories.append(cat)
+ cat.add(self)
+
+ ports[name] = self
+
+ def remove(self):
+ """ Clean ourselves up but don't touch references in other objects;
+they still need to know about us as dependencies etc """
+
+ self.fdep = None
+ self.edep = None
+ self.pdep = None
+ self.bdep = None
+ self.rdep = None
+ self.alldep = None
+ self.parents = None
+
+ for cat in self.categories:
+ cat.remove(self)
+
+ ports[self.name] = None
+ del ports[self.name]
+ del self
+
+ def destroy(self):
+ """ Remove a package and all references to it """
+
+ for pkg in self.alldep:
+ if pkg.parents is not None:
+ # Already removed but not destroyed
+ try:
+ pkg.parents.remove(self)
+ except ValueError:
+ continue
+
+ for pkg in self.parents:
+ try:
+ pkg.fdep.remove(self)
+ except ValueError:
+ pass
+ try:
+ pkg.edep.remove(self)
+ except ValueError:
+ pass
+ try:
+ pkg.pdep.remove(self)
+ except ValueError:
+ pass
+ try:
+ pkg.bdep.remove(self)
+ except ValueError:
+ pass
+ try:
+ pkg.rdep.remove(self)
+ except ValueError:
+ pass
+ pkg.alldep.remove(self)
+
+ sys.exc_clear()
+
+ self.remove()
+
+ def setdeps(self, bdep, rdep, edep, pdep, fdep):
+ self.fdep = [ports[p] for p in fdep.split()]
+ self.edep = [ports[p] for p in edep.split()]
+ self.pdep = [ports[p] for p in pdep.split()]
+ self.bdep = [ports[p] for p in bdep.split()]
+ self.rdep = [ports[p] for p in rdep.split()]
+
+ self.alldep = list(set(chain(self.fdep, self.edep, self.pdep,
+ self.bdep, self.rdep)))
+
+ for p in self.alldep:
+ p.parents.append(self)
+
+ def depth_recursive(self):
+
+ """
+ Recursively populate the depth tree up from a given package
+ through dependencies, assuming empty values on entries not yet
+ visited
+ """
+
+ if self.depth is None:
+ if len(self.parents) > 0:
+ max = 0
+ for i in self.parents:
+ w = i.depth_recursive()
+ if w > max:
+ max = w
+ self.depth = max + 1
+ else:
+ self.depth = 1
+ for port in QMANAGER_PRIORITY_PACKAGES:
+ if self.name.startswith(port):
+ # Artificial boost to try and get it building earlier
+ self.depth = 100
+ return self.depth
+
+ def destroy_recursive(self):
+ """ Remove a port and everything that depends on it """
+
+ parents=set([self])
+
+ while len(parents) > 0:
+ pkg = parents.pop()
+ assert pkg.depth is not None
+ parents.update(pkg.parents)
+ pkg.destroy()
+
+ def success(self):
+ """ Build succeeded and possibly uncovered some new leaves """
+
+ parents = self.parents[:]
+ self.done = True
+ self.remove()
+
+ newleafs = [p for p in parents if all(c.done for c in p.alldep)]
+ return newleafs
+
+ def failure(self):
+ """ Build failed """
+
+ self.destroy_recursive()
+
+ def packagename(self, arch, branch, buildid):
+ """ Return the path where a package may be found"""
+
+ return "/var/portbuild/%s/%s/builds/%s/packages/All/%s.tbz" \
+ % (arch, branch, buildid, self.name)
+
+ def is_stale(self, arch, branch, buildid):
+ """ Does a package need to be (re)-built?
+
+ Returns: False: if it exists and has newer mtime than all of
+ its dependencies.
+ True: otherwise
+ """
+
+ my_pkgname = self.packagename(arch, branch, buildid)
+ pkg_exists = os.path.exists(my_pkgname)
+
+ if pkg_exists:
+ my_mtime = os.stat(my_pkgname)[ST_MTIME]
+
+ dep_packages = [pkg.packagename(arch, branch, buildid)
+ for pkg in self.alldep]
+ deps_exist = all(os.path.exists(pkg) for pkg in dep_packages)
+ return not (pkg_exists and deps_exist and
+ all(os.stat(pkg)[ST_MTIME] <= my_mtime
+ for pkg in dep_packages))
+
+class Category(object):
+ def __init__(self, name):
+ self.name = name
+ self.ports = {}
+ categories[name] = self
+
+ def add(self, port):
+ self.ports[port] = port
+
+ def remove(self, port):
+ self.ports[port]=None
+ del self.ports[port]
+
+def gettargets(targets):
+ """ split command line arguments into list of packages to build.
+ Returns set or iterable of all ports that will be built including
+ dependencies """
+
+ plist = set()
+ if len(targets) == 0:
+ targets = ["all"]
+ for i in targets:
+ if i == "all":
+ return ports.itervalues()
+
+ if i.endswith("-all"):
+ cat = i.rpartition("-")[0]
+ plist.update(p.name for p in categories[cat].ports)
+ elif i.rstrip(".tbz") in ports:
+ plist.update([ports[i.rstrip(".tbz")].name])
+ else:
+ raise KeyError, i
+
+ # Compute transitive closure of all dependencies
+ pleft=plist.copy()
+ while len(pleft) > 0:
+ pkg = pleft.pop()
+ new = [p.name for p in ports[pkg].alldep]
+ plist.update(new)
+ pleft.update(new)
+
+ for p in set(ports.keys()).difference(plist):
+ ports[p].destroy()
+
+ return [ports[p] for p in plist]
+
+class worker(threading.Thread):
+
+ # Protects threads
+ lock = threading.Lock()
+
+ # Running threads, used for collecting status
+ threads = {}
+
+ def __init__(self, mach, job, arch, branch, buildid, queue):
+ threading.Thread.__init__(self)
+ self.machine = mach
+ self.job = job
+ self.arch = arch
+ self.branch = branch
+ self.buildid = buildid
+ self.queue = queue
+
+ self.setDaemon(True)
+
+ def run(self):
+ pkg = self.job
+
+ print "[MASTER] Running job %s" % (pkg.name),
+ if pkg.status == PHASE2:
+ print " (phase 2)"
+ else:
+ print
+ try:
+ runenv={'HOME':"/root",
+ 'PATH':'/sbin:/bin:/usr/sbin:/usr/bin:/usr/games:/usr/local/sbin:/usr/local/bin:/var/portbuild/scripts',
+ 'FD':" ".join(["%s.tbz" % p.name for p in pkg.fdep]),
+ 'ED':" ".join(["%s.tbz" % p.name for p in pkg.edep]),
+ 'PD':" ".join(["%s.tbz" % p.name for p in pkg.pdep]),
+ 'BD':" ".join(["%s.tbz" % p.name for p in pkg.bdep]),
+ 'RD':" ".join(["%s.tbz" % p.name for p in pkg.rdep])}
+ for var in ["NOCLEAN", "NO_RESTRICTED", "NOPLISTCHECK", "NO_DISTFILES", "FETCH_ORIGINAL", "TRYBROKEN" ]:
+ if var in os.environ:
+ runenv[var] = os.environ.get(var)
+ build = subprocess.Popen(
+ ["/bin/sh", "/var/portbuild/scripts/pdispatch",
+ self.arch, self.branch, self.buildid, self.machine,
+ "/var/portbuild/scripts/portbuild", "%s.tbz" % pkg.name,
+ pkg.path],
+ env=runenv,
+ stderr=subprocess.STDOUT, stdout=subprocess.PIPE, bufsize=0)
+ except OSError, e:
+ print >>sys.stderr, "[%s:%s]: Execution failed: %s" % \
+ (pkg.id, pkg.name, e)
+ while True:
+ try:
+ line = build.stdout.readline()
+ except:
+ print "[%s:%s]: Failed reading from build script" % \
+ (pkg.id, pkg.name)
+ break
+ if line == "":
+ break
+ print "[%s:%s] %s" % (pkg.id, pkg.name, line.rstrip())
+
+ retcode = build.wait()
+
+# time.sleep(random.randint(0,60))
+#
+# r = random.random()
+# if r < 0.1:
+# retcode = 1
+# elif r < 0.15:
+# retcode = 254
+# else:
+# retcode = 0
+
+ conn = QManagerClientConn(stderr = sys.stderr)
+ timeout = 1
+ try:
+ (code, vars) = conn.command("release", {'id':pkg.id})
+ except RequestError, e:
+ print "[MASTER] Error releasing job %s (%s): %s" % (pkg.name, pkg.id, e.value)
+
+ if DEBUG:
+ print "[MASTER] got retcode %d from pkg %s" % (retcode, pkg.name)
+ if retcode == 254:
+ # Requeue soft failure at original priority
+ # XXX exponential backoff?
+ time.sleep(60)
+# print "Requeueing %s" % pkg.id
+ self.queue.put((-pkg.depth, pkg))
+ elif retcode == 253:
+ # setting up a machine, we should immediately retry
+ self.queue.put((-pkg.depth, pkg))
+ elif retcode == 0:
+ self.queue.put((SUCCESS_PRIO, pkg))
+ else:
+ self.queue.put((FAILURE_PRIO, pkg))
+
+ # Clean up
+ worker.lock.acquire()
+ worker.threads[self]=None
+ del worker.threads[self]
+ worker.lock.release()
+
+ @staticmethod
+ def dispatch(mach, job, arch, branch, buildid, queue):
+ wrk = worker(mach, job, arch, branch, buildid, queue)
+
+ worker.lock.acquire()
+ worker.threads[wrk] = wrk
+ worker.lock.release()
+
+ wrk.start()
+
+def main(arch, branch, buildid, args):
+ global index
+
+ basedir="/var/portbuild/"+arch+"/"+branch+"/builds/"+buildid
+ portsdir=basedir+"/ports"
+
+ # get the major branch number.
+ branchbase = branch.split("-")[ 0 ]
+ # XXX ERWLA - Ugly hack
+ branchbase = branchbase.split(".")[ 0 ]
+ indexfile=portsdir+"/INDEX-"+branchbase
+
+ print "[MASTER] parseindex..."
+ index = Index(indexfile)
+ index.parse()
+ print "[MASTER] length = %s" % len(ports)
+
+ print "[MASTER] Finding targets..."
+ targets = gettargets(args)
+
+ print "[MASTER] Calculating depth..."
+ depthindex(targets)
+
+ print "[MASTER] Pruning duds..."
+ dudsfile=basedir+"/duds"
+ for line in file(dudsfile):
+ try:
+ dud = ports[line.rstrip()]
+ except KeyError:
+ continue
+ print "[MASTER] Skipping %s (duds)" % dud.name
+ dud.destroy_recursive()
+
+ queue = PriorityQueue()
+ # XXX can do this while parsing index if we prune targets/duds
+ # first
+ for pkg in ports.itervalues():
+ if len(pkg.alldep) == 0:
+ queue.put((-pkg.depth, pkg))
+
+ # XXX check osversion, pool
+ mdl=["arch = %s" % arch]
+
+ # Main work loop
+ completed_jobs = 0
+ failed_jobs = 0
+ while len(ports) > 0:
+ print "[MASTER] Ports remaining=%s, Queue length=%s" % (len(ports), queue.qsize())
+
+ if len(ports) < 10:
+ print "[MASTER] Remaining ports: %s" % ports.keys()
+
+ (prio, job) = queue.get()
+ if DEBUG:
+ print "[MASTER] Job %s pulled from queue with prio %d" % ( job.name, prio )
+ if prio == SUCCESS_PRIO:
+ print "[MASTER] Job %s succeeded" % job.name
+ for new in job.success():
+ queue.put((-new.depth, new))
+ completed_jobs = completed_jobs + 1
+ continue
+ elif prio == FAILURE_PRIO:
+ if job.status == PHASE2:
+ print "[MASTER] Job %s failed" % job.name
+ job.failure()
+ continue
+ else:
+ # XXX MCL 20110421
+ completed_jobs = completed_jobs + 1
+ failed_jobs = failed_jobs + 1
+ if DEBUG:
+ print "[MASTER] jobs: %d failed jobs out of %d:" % \
+ ( failed_jobs, completed_jobs )
+ if completed_jobs > QMANAGER_RUNAWAY_THRESHOLD and \
+ float( failed_jobs ) / completed_jobs > QMANAGER_RUNAWAY_PERCENTAGE:
+ print "[MASTER] ERROR: runaway build detected: %d failed jobs out of %d:" % \
+ ( failed_jobs, completed_jobs )
+ print "[MASTER] RUN TERMINATED."
+ break
+
+ job.attempts = job.attempts + 1
+ # XXX MCL in theory, if all this code worked correctly,
+ # this condition would never trigger. In practice,
+ # however, it does, so bomb out before filling portmgr's
+ # mbox.
+ # XXX MCL 20110422 perhaps this code has been fixed now;
+ # XXX it did not use to work:
+ if job.attempts > QMANAGER_MAX_JOB_ATTEMPTS:
+ print "[MASTER] Job %s failed %d times; RUN TERMINATED." % ( job.name, job.attempts )
+ break
+ else:
+ # Requeue at low priority
+ print "[MASTER] Job %s failed (requeued for phase 2)" % job.name
+ job.status = PHASE2
+ queue.put((PHASE2_BASE_PRIO-job.depth, job))
+ continue
+ elif job.status == PHASE2:
+ depth = -(prio - PHASE2_BASE_PRIO)
+ else:
+ depth = -prio
+
+ print "[MASTER] Working on job %s, depth %d" % (job.name, depth)
+ if job.is_stale(arch, branch, buildid):
+ conn = QManagerClientConn(stderr = sys.stderr)
+ (code, vars) = conn.command("acquire",
+ {"name":job.name,
+ "type":"%s/%s/%s package" % \
+ (arch, branch, buildid),
+ "priority":10, "mdl":mdl})
+
+ if code[0] == "2":
+ machine=vars['machine']
+ job.id=vars['id']
+# print "Got ID %s" % job.id
+
+ worker.dispatch(machine, job, arch, branch, buildid, queue)
+ else:
+ print "[MASTER] Error acquiring job %s: %s" % (pkg.name, code)
+ else:
+ print "[MASTER] Skipping %s since it already exists" % job.name
+ for new in job.success():
+ queue.put((-new.depth, new))
+
+ print "[MASTER] Waiting for threads"
+ threads = worker.threads.copy()
+
+ for t in threads:
+ print "[MASTER] Outstanding thread: %s" % t.job.name
+
+ for t in threads:
+ print "[MASTER] Waiting for thread %s" % t.job.name
+ t.join()
+
+ print "[MASTER] Finished"
+
+if __name__ == "__main__":
+
+ try:
+ main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4:])
+ sys.exit( 0 )
+ except Exception, e:
+ # XXX MCL TODO move this above
+ print "packagebuild: Exception:"
+ try:
+ print str( e )
+ except:
+ pass
+ sys.exit( 1 )
Added: projects/portbuild/qmanager/qclient
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ projects/portbuild/qmanager/qclient Sat Apr 23 21:02:25 2011 (r220976)
@@ -0,0 +1,242 @@
+#!/usr/bin/env python
+
+# queue manager client
+
+# TODO:
+# * pretty-print command output
+
+import socket, os, sys
+
+from optparse import OptionParser
+from qmanagerclient import *
+
+def error(msg):
+ print >>sys.stderr, "%s: %s" % (sys.argv[0], msg.rstrip())
+ sys.exit(1)
+
+def buildquery(option, opt, values, parser):
+ """
+ Turn command line options into a query description
+
+ Modifies:
+ query global
+
+ Raises:
+ ValueError if bogus arguments to numeric operators
+
+ """
+ global query
+
+ numopt = False
+ if opt in ("-m", "--machine"):
+ key="name"
+
*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***
More information about the svn-src-projects
mailing list