Bug 1286075: introduce job descriptions and implementations; r?gps draft
authorDustin J. Mitchell <dustin@mozilla.com>
Mon, 12 Sep 2016 15:53:14 +0000
changeset 412740 698c2ad78bc6bc706eb3826c6d7f749f8c8d97ba
parent 412739 484a9afc12893a2f2f4a63f9a7e4a53b536c64e4
child 412741 f69e076ee8f41d937ec2ede1dd5dfde3b3f33448
push id29252
push userdmitchell@mozilla.com
push dateMon, 12 Sep 2016 19:16:39 +0000
reviewersgps
bugs1286075
milestone51.0a1
Bug 1286075: introduce job descriptions and implementations; r?gps MozReview-Commit-ID: HNXPjt3XnXe
taskcluster/docs/transforms.rst
taskcluster/taskgraph/transforms/job/__init__.py
taskcluster/taskgraph/transforms/job/run_task.py
--- a/taskcluster/docs/transforms.rst
+++ b/taskcluster/docs/transforms.rst
@@ -75,19 +75,23 @@ using :func:`taskgraph.transform.base.ge
 
 Organization
 -------------
 
 Task creation operates broadly in a few phases, with the interfaces of those
 stages defined by schemas.  The process begins with the raw data structures
 parsed from the YAML files in the kind configuration.  This data can processed
 by kind-specific transforms resulting, for test jobs, in a "test description".
-The shared test-description transforms then create a "task description", which
-the task-generation transforms then convert into a task definition suitable for
-``queue.createTask``.
+For non-test jobs, the next step is a "job description".  These transformations
+may also "duplicate" tasks, for example to implement chunking or several
+variations of the same task.
+
+In any case, shared transforms then convert this into a "task description",
+which the task-generation transforms then convert into a task definition
+suitable for ``queue.createTask``.
 
 Test Descriptions
 -----------------
 
 The transforms configured for test kinds proceed as follows, based on
 configuration in ``kind.yml``:
 
  * The test description is validated to conform to the schema in
@@ -117,16 +121,35 @@ configuration in ``kind.yml``:
    worker options, and so on.
 
  * Finally, the ``taskgraph.transforms.task:transforms``, described above
    under "Task-Generation Transforms", are applied.
 
 Test dependencies are produced in the form of a dictionary mapping dependency
 name to task label.
 
+Job Descriptions
+----------------
+
+A job description says what to run in the task.  It is a combination of a
+``run`` section and all of the fields from a task description.  The run section
+has a ``using`` property that defines how this task should be run; for example,
+``mozharness`` to run a mozharness script, or ``mach`` to run a mach command.
+The remainder of the run section is specific to the run-using implementation.
+
+The effect of a job description is to say "run this thing on this worker".  The
+job description must contain enough information about the worker to identify
+the workerType and the implementation (docker-worker, generic-worker, etc.).
+Any other task-description information is passed along verbatim, although it is
+augmented by the run-using implementation.
+
+The run-using implementations are all located in
+``taskcluster/taskgraph/transforms/job``, along with the schemas for their
+implementations.  Those source files are the canonical documentation.
+
 Task Descriptions
 -----------------
 
 Every kind needs to create tasks, and all of those tasks have some things in
 common.  They all run on one of a small set of worker implementations, each
 with their own idiosyncracies.  And they all report to TreeHerder in a similar
 way.
 
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/transforms/job/__init__.py
@@ -0,0 +1,163 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+"""
+Convert a job description into a task description.
+
+Jobs descriptions are similar to task descriptions, but they specify how to run
+the job at a higher level, using a "run" field that can be interpreted by
+run-using handlers in `taskcluster/taskgraph/transforms/job`.
+"""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import copy
+import logging
+import os
+
+from taskgraph.transforms.base import validate_schema, TransformSequence
+from taskgraph.transforms.task import task_description_schema
+from voluptuous import (
+    Optional,
+    Required,
+    Schema,
+    Extra,
+)
+
+logger = logging.getLogger(__name__)
+
+# Voluptuous uses marker objects as dictionary *keys*, but they are not
+# comparable, so we cast all of the keys back to regular strings
+task_description_schema = {str(k): v for k, v in task_description_schema.schema.iteritems()}
+
+# Schema for a build description
+job_description_schema = Schema({
+    # The name of the job and the job's label.  At least one must be specified,
+    # and the label will be generated from the name if necessary, by prepending
+    # the kind.
+    Optional('name'): basestring,
+    Optional('label'): basestring,
+
+    # the following fields are passed directly through to the task description,
+    # possibly modified by the run implementation.  See
+    # taskcluster/taskgraph/transforms/task.py for the schema details.
+    Required('description'): task_description_schema['description'],
+    Optional('attributes'): task_description_schema['attributes'],
+    Optional('dependencies'): task_description_schema['dependencies'],
+    Optional('expires-after'): task_description_schema['expires-after'],
+    Optional('routes'): task_description_schema['routes'],
+    Optional('scopes'): task_description_schema['scopes'],
+    Optional('extra'): task_description_schema['extra'],
+    Optional('treeherder'): task_description_schema['treeherder'],
+    Optional('index'): task_description_schema['index'],
+    Optional('run-on-projects'): task_description_schema['run-on-projects'],
+    Optional('coalesce-name'): task_description_schema['coalesce-name'],
+    Optional('worker-type'): task_description_schema['worker-type'],
+    Required('worker'): task_description_schema['worker'],
+
+    # A description of how to run this job.
+    'run': {
+        # The key to a job implementation in a peer module to this one
+        'using': basestring,
+
+        # Any remaining content is verified against that job implementation's
+        # own schema.
+        Extra: object,
+    },
+})
+
+transforms = TransformSequence()
+
+
+@transforms.add
+def validate(config, jobs):
+    for job in jobs:
+        yield validate_schema(job_description_schema, job,
+                              "In job {!r}:".format(job['name']))
+
+
+@transforms.add
+def make_task_description(config, jobs):
+    """Given a build description, create a task description"""
+    # import plugin modules first, before iterating over jobs
+    import_all()
+    for job in jobs:
+        if 'label' not in job:
+            if 'name' not in job:
+                raise Exception("job has neither a name nor a label")
+            job['label'] = '{}-{}'.format(config.kind, job['name'])
+        if job['name']:
+            del job['name']
+
+        taskdesc = copy.deepcopy(job)
+
+        # fill in some empty defaults to make run implementations easier
+        taskdesc.setdefault('attributes', {})
+        taskdesc.setdefault('dependencies', {})
+        taskdesc.setdefault('routes', [])
+        taskdesc.setdefault('scopes', [])
+        taskdesc.setdefault('extra', {})
+
+        # give the function for job.run.using on this worker implementation a
+        # chance to set up the task description.
+        configure_taskdesc_for_run(config, job, taskdesc)
+        del taskdesc['run']
+
+        # yield only the task description, discarding the job description
+        yield taskdesc
+
+# A registry of all functions decorated with run_job_using
+registry = {}
+
+
+def run_job_using(worker_implementation, run_using, schema=None):
+    """Register the decorated function as able to set up a task description for
+    jobs with the given worker implementation and `run.using` property.  If
+    `schema` is given, the job's run field will be verified to match it.
+
+    The decorated function should have the signature `using_foo(config, job,
+    taskdesc) and should modify the task description in-place.  The skeleton of
+    the task description is already set up, but without a payload."""
+    def wrap(func):
+        for_run_using = registry.setdefault(run_using, {})
+        if worker_implementation in for_run_using:
+            raise Exception("run_job_using({!r}, {!r}) already exists: {!r}".format(
+                run_using, worker_implementation, for_run_using[run_using]))
+        for_run_using[worker_implementation] = (func, schema)
+        return func
+    return wrap
+
+
+def configure_taskdesc_for_run(config, job, taskdesc):
+    """
+    Run the appropriate function for this job against the given task
+    description.
+
+    This will raise an appropriate error if no function exists, or if the job's
+    run is not valid according to the schema.
+    """
+    run_using = job['run']['using']
+    if run_using not in registry:
+        raise Exception("no functions for run.using {!r}".format(run_using))
+
+    worker_implementation = job['worker']['implementation']
+    if worker_implementation not in registry[run_using]:
+        raise Exception("no functions for run.using {!r} on {!r}".format(
+            run_using, worker_implementation))
+
+    func, schema = registry[run_using][worker_implementation]
+    if schema:
+        job['run'] = validate_schema(
+                schema, job['run'],
+                "In job.run using {!r} for job {!r}:".format(
+                    job['run']['using'], job['label']))
+
+    func(config, job, taskdesc)
+
+
+def import_all():
+    """Import all modules that are siblings of this one, triggering the decorator
+    above in the process."""
+    for f in os.listdir(os.path.dirname(__file__)):
+        if f.endswith('.py') and f not in ('commmon.py', '__init__.py'):
+            __import__('taskgraph.transforms.job.' + f[:-3])
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/transforms/job/run_task.py
@@ -0,0 +1,77 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+"""
+Support for running jobs that are invoked via the `run-task` script.
+"""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import copy
+
+from taskgraph.transforms.job import run_job_using
+from voluptuous import Schema, Required, Any
+
+run_task_schema = Schema({
+    Required('using'): 'run-task',
+
+    # if true, add a cache at ~worker/.cache, which is where things like pip
+    # tend to hide their caches.  This cache is never added for level-1 jobs.
+    Required('cache-dotcache', default=False): bool,
+
+    # if true (the default), perform a checkout in /home/worker/checkouts/gecko
+    Required('checkout', default=True): bool,
+
+    # The command arguments to pass to the `run-task` script, after the
+    # checkout arguments.  If a list, it will be passed directly; otherwise
+    # it will be included in a single argument to `bash -cx`.
+    Required('command'): Any([basestring], basestring),
+})
+
+
+@run_job_using("docker-worker", "run-task", schema=run_task_schema)
+def docker_worker_run_task(config, job, taskdesc):
+    run = job['run']
+    checkout = run['checkout']
+
+    worker = taskdesc['worker'] = copy.deepcopy(job['worker'])
+
+    if checkout:
+        worker['caches'] = [{
+            'type': 'persistent',
+            'name': 'level-{}-hg-shared'.format(config.params['level']),
+            'mount-point': "/home/worker/hg-shared",
+        }, {
+            'type': 'persistent',
+            'name': 'level-{}-checkouts'.format(config.params['level']),
+            'mount-point': "/home/worker/checkouts",
+        }]
+
+    if run.get('cache-dotcache') and int(config.params['level']) > 1:
+        worker['caches'].append({
+            'type': 'persistent',
+            'name': 'level-{level}-{project}-dotcache'.format(**config.params),
+            'mount-point': '/home/worker/.cache',
+        })
+
+    env = worker['env'] = {}
+    env.update({
+        'GECKO_BASE_REPOSITORY': config.params['base_repository'],
+        'GECKO_HEAD_REPOSITORY': config.params['head_repository'],
+        'GECKO_HEAD_REV': config.params['head_rev'],
+    })
+
+    # give the task access to the hgfingerprint secret
+    if checkout:
+        taskdesc['scopes'].append('secrets:get:project/taskcluster/gecko/hgfingerprint')
+        worker['taskcluster-proxy'] = True
+
+    run_command = run['command']
+    if isinstance(run_command, basestring):
+        run_command = ['bash', '-cx', run_command]
+    command = ['/home/worker/bin/run-task']
+    if checkout:
+        command.append('--vcs-checkout=/home/worker/checkouts/gecko')
+    command.append('--')
+    command.extend(run_command)
+    worker['command'] = command