Bug 1258497: actually create tasks, using the taskcluster proxy draft
authorDustin J. Mitchell <dustin@mozilla.com>
Wed, 04 May 2016 17:45:35 +0000
changeset 364482 ab5c6bac7fb1bbfad52eba9f4eada2affed07cd8
parent 364481 534aab54afdfafc02c6de7fb5220f367943b3319
child 364483 1f63e71b12e87e4318612f437e22c4ee3b2b444e
push id17466
push userdmitchell@mozilla.com
push dateFri, 06 May 2016 18:22:06 +0000
bugs1258497
milestone49.0a1
Bug 1258497: actually create tasks, using the taskcluster proxy This calls the TaskCluster API directly, rather than relying on the worker implementation to do so. MozReview-Commit-ID: H6ABkLS6zjR
taskcluster/mach_commands.py
taskcluster/taskgraph/create.py
taskcluster/taskgraph/kind/base.py
taskcluster/taskgraph/kind/legacy.py
taskcluster/taskgraph/test/test_create.py
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -148,16 +148,17 @@ class MachCommands(MachCommandBase):
         default="1",
         help='SCM level of this repository')
     def taskgraph_decision(self, **options):
         # load parameters from env vars, command line, etc.
         parameters = self.get_decision_parameters(options)
 
         # create a TaskGraphGenerator instance
         import taskgraph.generator
+        import taskgraph.create
         tgg = taskgraph.generator.TaskGraphGenerator(
             root=options['root'],
             log=self.log,
             parameters=parameters,
             optimization_finder=None)  # XXX
 
         # produce some artifacts
         def write_artifact(filename, data):
@@ -188,17 +189,18 @@ class MachCommands(MachCommandBase):
         # write out the full graph for reference
         write_artifact('full-task-graph.json',
                        self.taskgraph_to_json(tgg.full_task_graph))
 
         # write out the optimized task graph to describe what will happen
         write_artifact('task-graph.json',
                        self.taskgraph_to_json(tgg.optimized_task_graph))
 
-        # TODO: call the taskcluster API to create the tasks in the optimized graph
+        # actually create the graph
+        taskgraph.create.create_tasks(tgg.optimized_task_graph)
 
     ##
     # Parameter handling
 
     def load_parameters_file(self, options):
         filename = options['parameters']
         if not filename:
             return {}
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/create.py
@@ -0,0 +1,40 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import requests
+import json
+import collections
+
+from slugid import nice as slugid
+
+def create_tasks(taskgraph):
+    # TODO: use the taskGroupId of the decision task
+    task_group_id = slugid()
+    label_to_taskid = collections.defaultdict(slugid)
+
+    for label in taskgraph.graph.visit_postorder():
+        task = taskgraph.tasks[label]
+        deps_by_name = {
+            n: label_to_taskid[r]
+            for (l, r, n) in taskgraph.graph.edges
+            if l == label}
+        task_def = task.kind.get_task_definition(task, deps_by_name)
+        task_def['taskGroupId'] = task_group_id
+        task_def['dependencies'] = deps_by_name.values()
+        task_def['requires'] = 'all-completed'
+
+        _create_task(label_to_taskid[label], task_def)
+
+def _create_task(task_id, task_def):
+    # create the task using 'http://taskcluster/queue', which is proxied to the queue service
+    # with credentials appropriate to this job.
+    print("Creating task {}".format(task_id))
+    res = requests.put('http://taskcluster/queue/v1/task/{}'.format(task_id), data=json.dumps(task_def))
+    if res.status_code != 200:
+        try:
+            print res.json()['message']
+        except:
+            print res.text
+        res.raise_for_status()
--- a/taskcluster/taskgraph/kind/base.py
+++ b/taskcluster/taskgraph/kind/base.py
@@ -47,8 +47,24 @@ class Kind(object):
         task.  If another task with the same optimization key has already been
         performed, it will be used directly instead of executing the task
         again.
 
         Returns a string suitable for inclusion in a TaskCluster index
         namespace (generally of the form `<optimizationName>.<hash>`), or None
         if this task cannot be optimized.
         """
+
+    @abc.abstractmethod
+    def get_task_definition(self, task, dependent_taskids):
+        """
+        Get the final task definition for the given task.  This is the time to
+        substitute actual taskIds for dependent tasks into the task definition.
+        Note that this method is only used in the decision tasks, so it should
+        not perform any processing that users might want to test or see in
+        other `mach taskgraph` commands.
+
+        The `dependent_taskids` parameter is a dictionary mapping dependency
+        name to assigned taskId.
+
+        The returned task definition will be modified before being passed to
+        `queue.createTask`.
+        """
--- a/taskcluster/taskgraph/kind/legacy.py
+++ b/taskcluster/taskgraph/kind/legacy.py
@@ -1,13 +1,14 @@
 import time
 import os
 import sys
 import json
 import copy
+import re
 import logging
 from . import base
 from taskgraph.types import Task
 
 from functools import partial
 
 from mozpack.path import match as mozpackmatch
 
@@ -31,23 +32,29 @@ from taskcluster_graph.image_builder imp
 )
 from taskcluster_graph.from_now import (
     json_time_from_now,
     current_json_time,
 )
 from taskcluster_graph.templates import Templates
 import taskcluster_graph.build_task
 
+# TASKID_PLACEHOLDER is the "internal" form of a taskid; it is substituted with
+# actual taskIds at the very last minute, in get_task_definition
+TASKID_PLACEHOLDER = 'TaskLabel=={}'
+
 ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
 DEFINE_TASK = 'queue:define-task:aws-provisioner-v1/{}'
 DEFAULT_TRY = 'try: -b do -p all -u all -t all'
 DEFAULT_JOB_PATH = os.path.join(
     'tasks', 'branches', 'base_jobs.yml'
 )
 
+def mklabel():
+    return TASKID_PLACEHOLDER.format(slugid())
 
 class LegacyKind(base.Kind):
 
     def load_tasks(self, params):
         root = os.path.abspath(os.path.join(self.path, self.config['legacy_path']))
 
         project = params['project']
         # NOTE: message is ignored here; we always use DEFAULT_TRY, then filter the
@@ -178,17 +185,17 @@ class LegacyKind(base.Kind):
         all_routes = {}
 
         for build in job_graph:
             self.log(logging.DEBUG, 'load-task', {
                 'task': build['task'],
             }, 'loading task {task}')
             interactive = cmdline_interactive or build["interactive"]
             build_parameters = merge_dicts(parameters, build['additional-parameters'])
-            build_parameters['build_slugid'] = slugid()
+            build_parameters['build_slugid'] = mklabel()
             build_parameters['source'] = '{repo}file/{rev}/testing/taskcluster/{file}'.format(repo=params['head_repository'], rev=params['head_rev'], file=build['task'])
             build_task = templates.load(build['task'], build_parameters)
 
             # Copy build_* attributes to expose them to post-build tasks
             # as well as json routes and tests
             task_extra = build_task['task']['extra']
             build_parameters['build_name'] = task_extra['build_name']
             build_parameters['build_type'] = task_extra['build_type']
@@ -276,17 +283,17 @@ class LegacyKind(base.Kind):
             for post_build in build['post-build']:
                 # copy over the old parameters to update the template
                 # TODO additional-parameters is currently not an option, only
                 # enabled for build tasks
                 post_parameters = merge_dicts(build_parameters,
                                               post_build.get('additional-parameters', {}))
                 post_task = configure_dependent_task(post_build['task'],
                                                      post_parameters,
-                                                     slugid(),
+                                                     mklabel(),
                                                      templates,
                                                      build_treeherder_config)
                 normalize_image_details(graph,
                                         post_task,
                                         seen_images,
                                         build_parameters,
                                         os.environ.get('TASK_ID', None))
                 set_interactive_task(post_task, interactive)
@@ -326,17 +333,17 @@ class LegacyKind(base.Kind):
                     if 'only_chunks' in test and chunked and \
                             chunk not in test['only_chunks']:
                         continue
 
                     if chunked:
                         test_parameters['chunk'] = chunk
                     test_task = configure_dependent_task(test['task'],
                                                          test_parameters,
-                                                         slugid(),
+                                                         mklabel(),
                                                          templates,
                                                          build_treeherder_config)
                     normalize_image_details(graph,
                                             test_task,
                                             seen_images,
                                             build_parameters,
                                             os.environ.get('TASK_ID', None))
                     set_interactive_task(test_task, interactive)
@@ -360,17 +367,17 @@ class LegacyKind(base.Kind):
                         if param in test_parameters:
                             test_task['attributes'][attr] = str(test_parameters[param])
 
                     # This will schedule test jobs N times
                     for i in range(0, trigger_tests):
                         graph['tasks'].append(test_task)
                         # If we're scheduling more tasks each have to be unique
                         test_task = copy.deepcopy(test_task)
-                        test_task['taskId'] = slugid()
+                        test_task['taskId'] = mklabel()
 
                     define_task = DEFINE_TASK.format(
                         test_task['task']['workerType']
                     )
 
                     graph['scopes'].add(define_task)
                     graph['scopes'] |= set(test_task['task'].get('scopes', []))
 
@@ -385,12 +392,28 @@ class LegacyKind(base.Kind):
         # taskId for each task, and we use those as the *labels* for the tasks;
         # taskgraph will later assign them new taskIds.
         return [Task(self, t['taskId'], task=t['task'], attributes=t['attributes'])
                 for t in self.graph['tasks']]
 
     def get_task_dependencies(self, task, taskgraph):
         # fetch dependency information from the cached graph
         taskdict = self.tasks_by_label[task.label]
-        return [(label, 'legacy') for label in taskdict.get('requires', [])]
+        return [(label, label) for label in taskdict.get('requires', [])]
 
     def get_task_optimization_key(self, task, taskgraph):
         pass
+
+    def get_task_definition(self, task, dependent_taskids):
+        # Note that the keys for `dependent_taskids` are task labels in this
+        # case, since that's how get_task_dependencies set it up.
+        placeholder_pattern = re.compile(r'TaskLabel==[a-zA-Z0-9-_]{22}')
+        def repl(mo):
+            return dependent_taskids[mo.group(0)]
+
+        # this is a cheap but easy way to replace all placeholders with
+        # actual real taskIds now that they are known.  The placeholder
+        # may be embedded in a longer string, so traversing the data structure
+        # would still require regexp matching each string and not be
+        # appreciably faster.
+        task_def = json.dumps(task.task)
+        task_def = placeholder_pattern.sub(repl, task_def)
+        return json.loads(task_def)
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/test/test_create.py
@@ -0,0 +1,57 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import sys
+import unittest
+
+from taskgraph import create
+from taskgraph.graph import Graph
+from taskgraph.types import Task, TaskGraph
+
+from mozunit import main
+
+class FakeKind(object):
+
+    def get_task_definition(self, task, deps_by_name):
+        # sanity-check the deps_by_name
+        for k, v in deps_by_name.iteritems():
+            assert k == 'edge'
+        return {'payload': 'hello world'}
+
+
+class TestCreate(unittest.TestCase):
+
+    def setUp(self):
+        self.created_tasks = {}
+        self.old_create_task = create._create_task
+        create._create_task = self.fake_create_task
+
+    def tearDown(self):
+        create._create_task = self.old_create_task
+
+    def fake_create_task(self, task_id, task_def):
+        self.created_tasks[task_id] = task_def
+
+    def test_create_tasks(self):
+        kind = FakeKind()
+        tasks = {
+            'a': Task(kind=kind, label='a'),
+            'b': Task(kind=kind, label='b'),
+        }
+        graph = Graph(nodes=set('ab'), edges=set([('a', 'b', 'edge')]))
+        taskgraph = TaskGraph(tasks, graph)
+
+        create.create_tasks(taskgraph)
+
+        for tid, task in self.created_tasks.iteritems():
+            self.assertEqual(task['payload'], 'hello world')
+            # make sure the dependencies exist, at least
+            for depid in task['dependencies']:
+                self.assertIn(depid, self.created_tasks)
+
+
+if __name__ == '__main__':
+    main()
+