svn commit: r220976 - in projects/portbuild: conf qmanager scripts

Florent Thoumie flz at FreeBSD.org
Sat Apr 23 21:02:26 UTC 2011


Author: flz
Date: Sat Apr 23 21:02:25 2011
New Revision: 220976
URL: http://svn.freebsd.org/changeset/base/220976

Log:
  Import qmanager into portbuild.

Added:
  projects/portbuild/qmanager/
  projects/portbuild/qmanager/acl.py
  projects/portbuild/qmanager/dumpdb.py   (contents, props changed)
  projects/portbuild/qmanager/packagebuild   (contents, props changed)
  projects/portbuild/qmanager/qclient   (contents, props changed)
  projects/portbuild/qmanager/qmanager   (contents, props changed)
  projects/portbuild/qmanager/qmanager.py   (contents, props changed)
  projects/portbuild/qmanager/qmanagerclient.py
  projects/portbuild/qmanager/qmanagerhandler.py
  projects/portbuild/qmanager/qmanagerobj.py
  projects/portbuild/qmanager/schema.sql
Modified:
  projects/portbuild/conf/server.conf
  projects/portbuild/scripts/dopackages

Modified: projects/portbuild/conf/server.conf
==============================================================================
--- projects/portbuild/conf/server.conf	Sat Apr 23 20:59:58 2011	(r220975)
+++ projects/portbuild/conf/server.conf	Sat Apr 23 21:02:25 2011	(r220976)
@@ -72,7 +72,6 @@ PDISPATCH_TIMEOUT=360000
 # qmanager definitions (note: Python script, so avoid {})
 #
 
-QMANAGER_PATH=/var/portbuild/evil/qmanager
 QMANAGER_DATABASE_FILE=qdb.sl3
 QMANAGER_SOCKET_FILE=/tmp/.qmgr
 

Added: projects/portbuild/qmanager/acl.py
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ projects/portbuild/qmanager/acl.py	Sat Apr 23 21:02:25 2011	(r220976)
@@ -0,0 +1,156 @@
+# Validate a (uid, (gids)) tuple against an ACL.
+
+import pwd, grp
+
+def getuidbyname(username):
+    if str(username).isdigit():
+        return int(username)
+    return pwd.getpwnam(username)[2]
+
+def getgidbyname(grname):
+    if str(grname).isdigit():
+        return int(grname)
+    return grp.getgrnam(grname)[2]
+
+class ACLElement(object):
+    """ Component of an ACL. """
+
+    def __init__(self, name, uidlist, gidlist, sense):
+        self.name = name
+        self.uidlist = [getuidbyname(uid) for uid in uidlist]
+        self.gidlist = [getgidbyname(gid) for gid in gidlist]
+        self.sense = bool(sense)
+
+    def validate(self, uid, gids):
+        """ Validate an ACL Element.  In order to match, the following must
+        hold:
+
+        * uid is a subset of self.uidlist, or self.uidlist is empty
+        * one of the gids must be present in self.gidlist, or
+          self.gidlist is empty
+
+        If both conditions hold, then the validation returns self.sense
+
+        Returns: True/False if Element matches
+        None if Element fails to match
+        """
+
+        if (len(self.uidlist) == 0 or uid in self.uidlist) and \
+                (len(self.gidlist) == 0 or set(gids).intersection(self.gidlist)):
+            return self.sense
+        return None
+
+class ACL(object):
+    """ List of ACLElements that form an ACL """
+
+    def __init__(self, acllist):
+        self.acls = acllist
+
+    def validate(self, uid, gids):
+        uid=getuidbyname(uid)
+        gids=set(getgidbyname(gid) for gid in gids)
+
+        for acl in self.acls:
+            res=acl.validate(uid, gids)
+            if res is not None:
+                return res
+        return False
+
+if __name__ == "__main__":
+
+    from sys import exit
+
+    assert getuidbyname(123) == 123
+    assert getuidbyname('123') == 123
+
+    try:
+        ACLElement("test", ["foobar"], [""], True)
+    except KeyError:
+        pass
+
+    try:
+        ACLElement("test", [123, "foobar"], [""], True)
+    except KeyError:
+        pass
+
+    assert ACLElement("test", [123], [], True) != None
+    assert ACLElement("test", ["123"], [], True) != None
+
+    acl = ACL([ACLElement("el 1", ["kris"], [], True),
+               ACLElement("el 2", [], ["wheel"], True),
+               ACLElement("el 3", [], [], False)])
+
+    assert acl.validate(getuidbyname('kris'), []) == True
+    assert acl.validate(getuidbyname('simon'), []) == False
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('root'), [pwd.getpwnam('root')[3]]) == True
+
+    acl = ACL([ACLElement("el 1", ["kris"], ["distcc"], True),
+               ACLElement("el 2", [], ["wheel"], True),
+               ACLElement("el 3", [], [], False)])
+    assert acl.validate("kris", ["wheel"]) == True
+    assert acl.validate("kris", ["staff"]) == False
+
+    acl = ACL([ACLElement("", ('kris',), (), True),
+               ACLElement("", (), ('wheel', 'devel'), True),
+               ACLElement("", (), (), False)])
+
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+    acl = ACL([ACLElement("", ('kris',), (), True),
+               ACLElement("", (), ('devel',), True),
+               ACLElement("", (), ('wheel',), True),
+               ACLElement("", (), (), False)])
+
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+    acl = ACL([ACLElement("", ('kris',), (), True),
+               ACLElement("", (), ('devel',), False),
+               ACLElement("", (), ('wheel',), True),
+               ACLElement("", (), (), False)])
+
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == False
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+
+    acl = ACL([ACLElement("", ('kris',), (), True),
+               ACLElement("", (), ('devel',), True),
+               ACLElement("", (), ('wheel',), False),
+               ACLElement("", (), (), False)])
+
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == False
+
+
+    acl = ACL([ACLElement("", ('kris',), (), True),
+               ACLElement("", (), ('devel',), True),
+               ACLElement("", (), ('wheel',), False),
+               ACLElement("", (), (), True)])
+
+    assert acl.validate(getuidbyname('simon'), []) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == True
+
+    acl = ACL([ACLElement("", ('kris',), (), False),
+               ACLElement("", (), ('devel',), True),
+               ACLElement("", (), ('wheel',), False),
+               ACLElement("", (), (), True)])
+
+    assert acl.validate(getuidbyname('simon'), []) == True
+    assert acl.validate(getuidbyname('kris'), []) == False
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('devel'), getgidbyname('wheel')]) == True
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff'), getgidbyname('wheel')]) == False
+    assert acl.validate(getuidbyname('simon'), [getgidbyname('staff')]) == True
+
+    acl = ACL([ACLElement("", (4206,), set([]), True),
+               ACLElement("", (), set([]), False)])
+
+    assert acl.validate(4206, (4206, 31337)) == True
+    assert acl.validate(4201, (4201, 31337)) == False

Added: projects/portbuild/qmanager/dumpdb.py
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ projects/portbuild/qmanager/dumpdb.py	Sat Apr 23 21:02:25 2011	(r220976)
@@ -0,0 +1,140 @@
+#
+# try doing some SQL reads as a test
+#
+from freebsd_config import *
+
+import os, threading, socket, Queue
+
+from signal import *
+from sys import exc_info
+from itertools import chain
+
+from qmanagerobj import *
+
+CONFIG_DIR="/var/portbuild"
+CONFIG_SUBDIR="conf"
+CONFIG_FILENAME="server.conf"
+
+# pieces of qmanagerobj.startup
+def obj_startup(filename):
+
+    engine = create_engine('sqlite:///' + filename, echo=True)
+    Session = sessionmaker(bind=engine)
+    session = Session()
+
+    Base.metadata.create_all(engine)
+
+    return (engine, session)
+
+
+def show_acl( session ):
+
+    acls = session.query(QManagerACL)
+    acls = acls.order_by('name')
+
+    print
+    print 'starting dump of acl table:'
+    print
+
+    for acl in acls:
+
+        print
+        print "name: %s" % acl.name
+        # list
+        print "uidlist: " + str( acl.uidlist )
+        # list
+        print "gidlist: " + str( acl.gidlist )
+        print "sense: " + str( acl.sense )
+
+
+def show_jobs( session ):
+
+    jobs = session.query(Job)
+    jobs = jobs.order_by('id')
+
+    print
+    print 'starting dump of Job table:'
+    print
+
+    for job in jobs:
+
+        print
+        # job ID
+        print "job id: " + `job.id`
+        # Name of job
+        print "name: " + job.name
+        # priority of request
+        print "priority: " + `job.priority`
+        # job type
+        print "type: " + job.type
+        # uid of job owner
+        print "owner: " + `job.owner`
+        # gids of job owner (tuple)
+        #print str( type( job.gids ) )
+        print "gids: " + str( job.gids )
+        # machines that satisfied initial query (list)
+        #print str( type( job.machines ) )
+        print "machines: " + str( job.machines )
+        # Time job started/blocked (must not be modified when job is
+        # blocked or it will corrupt the heapq)
+        print "startttime: " + `job.starttime`
+        # initial machine description in case we have to revalidate (list)
+        # print str( type( job.mdl ) )
+        print "mdl: " + str( job.mdl )
+        # True --> job is running; False --> job is blocked
+        print "running: " + str( job.running )
+
+
+def show_machines( session ):
+
+    machines = session.query(Machine)
+    machines = machines.order_by('name')
+
+    print
+    print 'starting dump of Machines table:'
+    print
+
+    for machine in machines:
+
+        print
+        print "name: %s" % machine.name
+        # list
+        print "acl: " + str( machine.acl )
+        # boolean
+        print "haszfs: " + str( machine.haszfs )
+        # boolean
+        print "online: " + str( machine.online )
+
+
+def show_machines_for_arch( engine, arch ):
+
+    mdl = ["arch = %s" % arch]
+
+    q = SQL.construct(Machine, mdl)
+    res = engine.execute(Machine.__table__.select(q))
+    result = [SQL.to_dict(Machine, i) for i in res]
+
+    print
+    for machine in result:
+        print "machine for %s : %s " % ( arch, machine[ 'name' ] )
+
+
+# main
+
+if __name__ == '__main__':
+
+    print "acquiring engine and session"
+    config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME )
+    QMANAGER_PATH = config.get( 'QMANAGER_PATH' )
+    QMANAGER_DATABASE_FILE = config.get( 'QMANAGER_DATABASE_FILE' )
+    (engine, session) = obj_startup( \
+        os.path.join( QMANAGER_PATH, QMANAGER_DATABASE_FILE ) )
+    print "acquired engine and session"
+    # print "engine = '" + str( engine ) + "', session = '" + str( session ) + "'"
+
+    show_acl( session )
+    show_machines( session )
+    show_jobs( session )
+
+    show_machines_for_arch( engine, 'i386' )
+    show_machines_for_arch( engine, 'amd64' )

Added: projects/portbuild/qmanager/packagebuild
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ projects/portbuild/qmanager/packagebuild	Sat Apr 23 21:02:25 2011	(r220976)
@@ -0,0 +1,649 @@
+#!/usr/bin/env python
+
+# Improved build dispatcher.  Invoked on server-side from dopackages.
+
+# We try to build leaf packages (those
+# which can be built immediately without requiring additional
+# dependencies to be built) in the order such that the ones required
+# by the longest dependency chains are built first.
+#
+# This has the effect of favouring deep parts of the package tree and
+# evening out the depth over time, hopefully avoiding the situation
+# where the entire cluster waits for a deep part of the tree to
+# build on a small number of machines
+#
+# We can dynamically respond to changes in build machine availability,
+# since the queue manager will block jobs that cannot be immediately
+# satisfied and will unblock us when a job slot becomes available.
+#
+# When a package build fails, it is requeued with a lower priority
+# such that it will rebuild again as soon as no "phase 1" packages
+# are available to build.  This prevents the cluster staying idle
+# until the last phase 1 package builds.
+#
+# Other advantages are that this system is easily customizable and in
+# the future will let us customize things like the matching policy of
+# jobs to machines.  For example, we could avoid dispatching multiple
+# openoffice builds to the same system.
+#
+# TODO:
+# * Combine build prep stages?
+#    - initial check for file up-to-date
+# * check mtime for package staleness (cf make)
+# * option to skip phase 2
+
+from qmanagerclient import *
+
+from freebsd_config import *
+
+import os, string, sys, threading, time, subprocess
+#import random
+from itertools import chain
+#import gc
+from stat import *
+
+from Queue import Queue
+from heapq import *
+
+CONFIG_DIR="/var/portbuild"
+CONFIG_SUBDIR="conf"
+CONFIG_FILENAME="server.conf"
+
+config = getConfig( CONFIG_DIR, CONFIG_SUBDIR, CONFIG_FILENAME )
+QMANAGER_MAX_JOB_ATTEMPTS   = int( \
+    config.get( 'QMANAGER_MAX_JOB_ATTEMPTS' ) )
+QMANAGER_PRIORITY_PACKAGES  = string.split( \
+    config.get( 'QMANAGER_PRIORITY_PACKAGES' ) )
+QMANAGER_RUNAWAY_PERCENTAGE = float( \
+    config.get( 'QMANAGER_RUNAWAY_PERCENTAGE' ) )
+QMANAGER_RUNAWAY_THRESHOLD  = int( \
+    config.get( 'QMANAGER_RUNAWAY_THRESHOLD' ) )
+
+DEBUG = False
+
+categories = {}
+ports = {}
+
+# When a build fails we requeue it with a lower priority such that it
+# will never preempt a phase 1 build but will build when spare
+# capacity is available.
+PHASE2_BASE_PRIO=1000
+
+# Process success quickly so other jobs are started
+SUCCESS_PRIO = -1000
+
+# Failure should be a less common event :)
+FAILURE_PRIO = -900
+
+# Port status codes
+PENDING = 1 # Yet to build
+PHASE2 = 2  # Failed once
+
+class PriorityQueue(Queue):
+    """Variant of Queue that retrieves open entries in
+    priority order (lowest first).
+    Entries are typically tuples of the form:  (priority number,
+    data)
+    This class can be found at: Python-2.6a3/Lib/Queue.py
+    """
+    maxsize = 0
+
+    def _init(self, maxsize):
+        self.queue = []
+
+    def _qsize(self, len=len):
+        return len(self.queue)
+
+    def _put(self, item, heappush=heappush):
+        heappush(self.queue, item)
+
+    def _get(self, heappop=heappop):
+        return heappop(self.queue) 
+
+class Index(object):
+
+    def __init__(self, indexfile):
+        self.indexfile = indexfile
+
+    def parse(self, targets = None):
+
+        print "[MASTER] Read index"
+        f = file(self.indexfile)
+        index = f.readlines()
+        f.close()
+        f = None
+        del f
+
+        lines=[]
+        print "[MASTER] Phase 1"
+        for i in index:
+            (name, path, prefix, comment, descr, maintainer, categories, bdep,
+             rdep, www, edep, pdep, fdep) = i.rstrip().split("|")
+
+            if targets is None or name in targets:
+                lines.append((name, bdep, rdep, edep, pdep, fdep))
+
+                Port(name, path, "", "", "", "",
+                     categories, "")
+        index = None
+        del index
+
+        print "[MASTER] Phase 2"
+        for (name, bdep, rdep, edep, pdep, fdep) in lines:
+            ports[name].setdeps(bdep, rdep, edep, pdep, fdep)
+
+        lines = None
+        del lines
+        print "[MASTER] Done"
+
+def depthindex(targets):
+    """ Initial population of depth tree """
+
+    for i in targets:
+        i.depth_recursive()
+
+class Port(object):
+
+    def __init__(self, name, path, prefix, comment, descr, maintainer,
+                 cats, www):
+
+        __slots__ = ["name", "path", "prefix", "comment", "descr",
+                     "maintainer", "www", "bdep", "rdep", "edep", "pdep",
+                     "fdep", "alldep", "parents",  "depth", "categories"]
+
+        self.name = name
+        self.path = path
+        self.prefix = prefix
+        self.comment = comment
+        self.descr = descr
+        self.maintainer = maintainer
+        self.www = www
+
+        # Populated later
+        self.bdep = []
+        self.rdep = []
+        self.edep = []
+        self.pdep = []
+        self.fdep = []
+
+        self.alldep = []
+        self.parents = []
+        self.id = None # XXX
+
+        self.status = PENDING
+        self.attempts = 0
+
+        # Whether the package build has completed and is hanging around
+        # to resolve dependencies for others XXX use status
+        self.done = False
+
+        # Depth is the maximum length of the dependency chain of this port
+        self.depth = None
+
+        self.categories=[]
+        scats = cats.split()
+        if len(scats) != len(set(scats)):
+            print "[MASTER] Warning: port %s includes duplicated categories: %s" % (name, cats)
+
+        for c in set(scats):
+            try:
+                cat = categories[c]
+            except KeyError:
+                cat = Category(c)
+
+            self.categories.append(cat)
+            cat.add(self)
+
+        ports[name] = self
+
+    def remove(self):
+        """ Clean ourselves up but don't touch references in other objects;
+they still need to know about us as dependencies etc """
+
+        self.fdep = None
+        self.edep = None
+        self.pdep = None
+        self.bdep = None
+        self.rdep = None
+        self.alldep = None
+        self.parents = None
+
+        for cat in self.categories:
+            cat.remove(self)
+
+        ports[self.name] = None
+        del ports[self.name]
+        del self
+        
+    def destroy(self):
+        """ Remove a package and all references to it """
+
+        for pkg in self.alldep:
+            if pkg.parents is not None:
+                # Already removed but not destroyed
+                try:
+                    pkg.parents.remove(self)
+                except ValueError:
+                    continue
+        
+        for pkg in self.parents:
+            try:
+                pkg.fdep.remove(self)
+            except ValueError:
+                pass
+            try:
+                pkg.edep.remove(self)
+            except ValueError:
+                pass
+            try:
+                pkg.pdep.remove(self)
+            except ValueError:
+                pass
+            try:
+                pkg.bdep.remove(self)
+            except ValueError:
+                pass
+            try:
+                pkg.rdep.remove(self)
+            except ValueError:
+                pass
+            pkg.alldep.remove(self)
+
+        sys.exc_clear()
+
+        self.remove()
+
+    def setdeps(self, bdep, rdep, edep, pdep, fdep):
+        self.fdep = [ports[p] for p in fdep.split()]
+        self.edep = [ports[p] for p in edep.split()]
+        self.pdep = [ports[p] for p in pdep.split()]
+        self.bdep = [ports[p] for p in bdep.split()]
+        self.rdep = [ports[p] for p in rdep.split()]
+
+        self.alldep = list(set(chain(self.fdep, self.edep, self.pdep,
+                                     self.bdep, self.rdep)))
+
+        for p in self.alldep:
+            p.parents.append(self)
+
+    def depth_recursive(self):
+
+        """
+        Recursively populate the depth tree up from a given package
+        through dependencies, assuming empty values on entries not yet
+        visited
+        """
+
+        if self.depth is None:
+            if len(self.parents) > 0:
+                max = 0
+                for i in self.parents:
+                    w = i.depth_recursive()
+                    if w > max:
+                        max = w
+                self.depth = max + 1
+            else:
+                self.depth = 1
+                for port in QMANAGER_PRIORITY_PACKAGES:
+                    if self.name.startswith(port):
+                        # Artificial boost to try and get it building earlier
+                        self.depth = 100
+        return self.depth
+
+    def destroy_recursive(self):
+        """ Remove a port and everything that depends on it """
+
+        parents=set([self])
+
+        while len(parents) > 0:
+            pkg = parents.pop()
+            assert pkg.depth is not None
+            parents.update(pkg.parents)
+            pkg.destroy()
+
+    def success(self):
+        """ Build succeeded and possibly uncovered some new leaves """
+
+        parents = self.parents[:]
+        self.done = True
+        self.remove()
+
+        newleafs = [p for p in parents if all(c.done for c in p.alldep)]
+        return newleafs
+
+    def failure(self):
+        """ Build failed """
+
+        self.destroy_recursive()
+
+    def packagename(self, arch, branch, buildid):
+        """ Return the path where a package may be found"""
+
+        return "/var/portbuild/%s/%s/builds/%s/packages/All/%s.tbz" \
+            % (arch, branch, buildid, self.name)
+
+    def is_stale(self, arch, branch, buildid):
+        """ Does a package need to be (re)-built?
+
+        Returns: False: if it exists and has newer mtime than all of
+        its dependencies.
+        True: otherwise
+        """
+
+        my_pkgname = self.packagename(arch, branch, buildid)
+        pkg_exists = os.path.exists(my_pkgname)
+
+        if pkg_exists:
+            my_mtime = os.stat(my_pkgname)[ST_MTIME]
+
+        dep_packages = [pkg.packagename(arch, branch, buildid)
+                        for pkg in self.alldep]
+        deps_exist = all(os.path.exists(pkg) for pkg in dep_packages)
+        return not (pkg_exists and deps_exist and
+                all(os.stat(pkg)[ST_MTIME] <= my_mtime
+                    for pkg in dep_packages))
+
+class Category(object):
+    def __init__(self, name):
+        self.name = name
+        self.ports = {}
+        categories[name] = self
+
+    def add(self, port):
+        self.ports[port] = port
+
+    def remove(self, port):
+        self.ports[port]=None
+        del self.ports[port]
+
+def gettargets(targets):
+    """ split command line arguments into list of packages to build.
+    Returns set or iterable of all ports that will be built including
+    dependencies """
+
+    plist = set()
+    if len(targets) == 0:
+        targets = ["all"]
+    for i in targets:
+        if i == "all":
+            return ports.itervalues()
+
+        if i.endswith("-all"):
+            cat = i.rpartition("-")[0]
+            plist.update(p.name for p in categories[cat].ports)
+        elif i.rstrip(".tbz") in ports:
+            plist.update([ports[i.rstrip(".tbz")].name])
+        else:
+            raise KeyError, i
+
+    # Compute transitive closure of all dependencies
+    pleft=plist.copy()
+    while len(pleft) > 0:
+        pkg = pleft.pop()
+        new = [p.name for p in ports[pkg].alldep]
+        plist.update(new)
+        pleft.update(new)
+
+    for p in set(ports.keys()).difference(plist):
+        ports[p].destroy()
+
+    return [ports[p] for p in plist]
+
+class worker(threading.Thread):
+
+    # Protects threads
+    lock = threading.Lock()
+
+    # Running threads, used for collecting status
+    threads = {}
+
+    def __init__(self, mach, job, arch, branch, buildid, queue):
+        threading.Thread.__init__(self) 
+        self.machine = mach
+        self.job = job
+        self.arch = arch
+        self.branch = branch
+        self.buildid = buildid
+        self.queue = queue
+
+        self.setDaemon(True)
+
+    def run(self): 
+        pkg = self.job
+
+        print "[MASTER] Running job %s" % (pkg.name),
+        if pkg.status == PHASE2:
+            print " (phase 2)"
+        else:
+            print
+        try:
+            runenv={'HOME':"/root",
+                 'PATH':'/sbin:/bin:/usr/sbin:/usr/bin:/usr/games:/usr/local/sbin:/usr/local/bin:/var/portbuild/scripts',
+                 'FD':" ".join(["%s.tbz" % p.name for p in pkg.fdep]),
+                 'ED':" ".join(["%s.tbz" % p.name for p in pkg.edep]),
+                 'PD':" ".join(["%s.tbz" % p.name for p in pkg.pdep]),
+                 'BD':" ".join(["%s.tbz" % p.name for p in pkg.bdep]),
+                 'RD':" ".join(["%s.tbz" % p.name for p in pkg.rdep])}
+	    for var in ["NOCLEAN", "NO_RESTRICTED", "NOPLISTCHECK", "NO_DISTFILES", "FETCH_ORIGINAL", "TRYBROKEN" ]:
+	         if var in os.environ:
+		     runenv[var] = os.environ.get(var)
+            build = subprocess.Popen(
+                ["/bin/sh", "/var/portbuild/scripts/pdispatch",
+                 self.arch, self.branch, self.buildid, self.machine,
+                 "/var/portbuild/scripts/portbuild", "%s.tbz" % pkg.name,
+                 pkg.path],
+		 env=runenv,    
+                stderr=subprocess.STDOUT, stdout=subprocess.PIPE, bufsize=0)
+        except OSError, e:
+            print >>sys.stderr, "[%s:%s]: Execution failed: %s" % \
+                (pkg.id, pkg.name, e)
+        while True:
+            try:
+                line = build.stdout.readline()
+            except:
+                print "[%s:%s]: Failed reading from build script" % \
+                    (pkg.id, pkg.name)
+                break
+            if line == "":
+                break
+            print "[%s:%s] %s" % (pkg.id, pkg.name, line.rstrip())
+
+        retcode = build.wait()
+        
+#        time.sleep(random.randint(0,60))
+#
+#        r = random.random()
+#        if r < 0.1:
+#            retcode = 1
+#        elif r < 0.15:
+#            retcode = 254
+#        else:
+#            retcode = 0
+
+        conn = QManagerClientConn(stderr = sys.stderr)
+        timeout = 1
+        try:
+            (code, vars) = conn.command("release", {'id':pkg.id})
+        except RequestError, e:
+            print "[MASTER] Error releasing job %s (%s): %s" % (pkg.name, pkg.id, e.value)
+
+        if DEBUG:
+            print "[MASTER] got retcode %d from pkg %s" % (retcode, pkg.name)
+        if retcode == 254:
+            # Requeue soft failure at original priority
+            # XXX exponential backoff?
+            time.sleep(60)
+#            print "Requeueing %s" % pkg.id
+            self.queue.put((-pkg.depth, pkg))
+        elif retcode == 253:
+            # setting up a machine, we should immediately retry
+            self.queue.put((-pkg.depth, pkg))
+        elif retcode == 0:
+            self.queue.put((SUCCESS_PRIO, pkg))
+        else:
+            self.queue.put((FAILURE_PRIO, pkg))
+
+        # Clean up
+        worker.lock.acquire()
+        worker.threads[self]=None
+        del worker.threads[self]
+        worker.lock.release()
+
+    @staticmethod
+    def dispatch(mach, job, arch, branch, buildid, queue):
+        wrk = worker(mach, job, arch, branch, buildid, queue)
+
+        worker.lock.acquire()
+        worker.threads[wrk] = wrk
+        worker.lock.release()
+
+        wrk.start()
+
+def main(arch, branch, buildid, args):
+    global index
+
+    basedir="/var/portbuild/"+arch+"/"+branch+"/builds/"+buildid
+    portsdir=basedir+"/ports"
+
+    # get the major branch number.
+    branchbase = branch.split("-")[ 0 ]
+    # XXX ERWLA - Ugly hack
+    branchbase = branchbase.split(".")[ 0 ]
+    indexfile=portsdir+"/INDEX-"+branchbase
+
+    print "[MASTER] parseindex..."
+    index = Index(indexfile)
+    index.parse()
+    print "[MASTER] length = %s" % len(ports)
+
+    print "[MASTER] Finding targets..."
+    targets = gettargets(args)
+
+    print "[MASTER] Calculating depth..."
+    depthindex(targets)
+
+    print "[MASTER] Pruning duds..."
+    dudsfile=basedir+"/duds"
+    for line in file(dudsfile):
+        try:
+            dud = ports[line.rstrip()]
+        except KeyError:
+            continue
+        print "[MASTER] Skipping %s (duds)" % dud.name
+        dud.destroy_recursive()
+
+    queue = PriorityQueue()
+    # XXX can do this while parsing index if we prune targets/duds
+    # first
+    for pkg in ports.itervalues():
+        if len(pkg.alldep) == 0:
+            queue.put((-pkg.depth, pkg))
+
+    # XXX check osversion, pool
+    mdl=["arch = %s" % arch]
+
+    # Main work loop
+    completed_jobs = 0
+    failed_jobs    = 0
+    while len(ports) > 0:
+        print "[MASTER] Ports remaining=%s, Queue length=%s" % (len(ports), queue.qsize())
+
+        if len(ports) < 10:
+            print "[MASTER] Remaining ports: %s" % ports.keys()
+
+        (prio, job) = queue.get()
+        if DEBUG:
+            print "[MASTER] Job %s pulled from queue with prio %d" % ( job.name, prio )
+        if prio == SUCCESS_PRIO:
+            print "[MASTER] Job %s succeeded" % job.name
+            for new in job.success():
+                queue.put((-new.depth, new))
+            completed_jobs = completed_jobs + 1
+            continue
+        elif prio == FAILURE_PRIO:
+            if job.status == PHASE2:
+                print "[MASTER] Job %s failed" % job.name
+                job.failure()
+                continue
+            else:
+                # XXX MCL 20110421
+                completed_jobs = completed_jobs + 1
+                failed_jobs    = failed_jobs + 1
+                if DEBUG:
+                    print "[MASTER] jobs: %d failed jobs out of %d:" % \
+                        ( failed_jobs, completed_jobs )
+                if completed_jobs > QMANAGER_RUNAWAY_THRESHOLD and \
+                        float( failed_jobs ) / completed_jobs > QMANAGER_RUNAWAY_PERCENTAGE:
+                    print "[MASTER] ERROR: runaway build detected: %d failed jobs out of %d:" % \
+                        ( failed_jobs, completed_jobs )
+                    print "[MASTER] RUN TERMINATED."
+                    break
+
+                job.attempts = job.attempts + 1
+                # XXX MCL in theory, if all this code worked correctly,
+                # this condition would never trigger.  In practice,
+                # however, it does, so bomb out before filling portmgr's
+                # mbox.
+                # XXX MCL 20110422 perhaps this code has been fixed now;
+                # XXX it did not use to work:
+                if job.attempts > QMANAGER_MAX_JOB_ATTEMPTS:
+                    print "[MASTER] Job %s failed %d times; RUN TERMINATED." % ( job.name, job.attempts )
+                    break
+                else:
+                    # Requeue at low priority
+                    print "[MASTER] Job %s failed (requeued for phase 2)" % job.name
+                    job.status = PHASE2
+                    queue.put((PHASE2_BASE_PRIO-job.depth, job))
+                    continue
+        elif job.status == PHASE2:
+            depth = -(prio - PHASE2_BASE_PRIO)
+        else:
+            depth = -prio
+
+        print "[MASTER] Working on job %s, depth %d" % (job.name, depth)
+        if job.is_stale(arch, branch, buildid):
+            conn = QManagerClientConn(stderr = sys.stderr)
+            (code, vars) = conn.command("acquire",
+                                        {"name":job.name,
+                                         "type":"%s/%s/%s package" % \
+                                             (arch, branch, buildid),
+                                         "priority":10, "mdl":mdl})
+
+            if code[0] == "2":
+                machine=vars['machine']
+                job.id=vars['id']
+#                print "Got ID %s" % job.id
+
+                worker.dispatch(machine, job, arch, branch, buildid, queue)
+            else:
+                print "[MASTER] Error acquiring job %s: %s" % (pkg.name, code)
+        else:
+            print "[MASTER] Skipping %s since it already exists" % job.name
+            for new in job.success():
+                queue.put((-new.depth, new))
+
+    print "[MASTER] Waiting for threads"
+    threads = worker.threads.copy()
+
+    for t in threads:
+        print "[MASTER] Outstanding thread: %s" % t.job.name
+
+    for t in threads:
+        print "[MASTER] Waiting for thread %s" % t.job.name
+        t.join()
+
+    print "[MASTER] Finished"
+
+if __name__ == "__main__":
+
+    try:
+        main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4:])
+        sys.exit( 0 )
+    except Exception, e:
+        # XXX MCL TODO move this above
+        print "packagebuild: Exception:"
+        try:
+            print str( e )
+        except:
+            pass
+        sys.exit( 1 )

Added: projects/portbuild/qmanager/qclient
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ projects/portbuild/qmanager/qclient	Sat Apr 23 21:02:25 2011	(r220976)
@@ -0,0 +1,242 @@
+#!/usr/bin/env python
+
+# queue manager client
+
+# TODO:
+#  * pretty-print command output
+
+import socket, os, sys
+
+from optparse import OptionParser
+from qmanagerclient import *
+
+def error(msg):
+    print >>sys.stderr, "%s: %s" % (sys.argv[0], msg.rstrip())
+    sys.exit(1)
+
+def buildquery(option, opt, values, parser):
+    """
+    Turn command line options into a query description
+
+    Modifies:
+    query global
+
+    Raises:
+    ValueError if bogus arguments to numeric operators
+
+    """
+    global query
+
+    numopt = False
+    if opt in ("-m", "--machine"):
+        key="name"
+

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***


More information about the svn-src-projects mailing list