Bug 1258497: actually create tasks, using the taskcluster proxy
This calls the TaskCluster API directly, rather than relying on the worker
implementation to do so.
MozReview-Commit-ID: H6ABkLS6zjR
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -148,16 +148,17 @@ class MachCommands(MachCommandBase):
default="1",
help='SCM level of this repository')
def taskgraph_decision(self, **options):
# load parameters from env vars, command line, etc.
parameters = self.get_decision_parameters(options)
# create a TaskGraphGenerator instance
import taskgraph.generator
+ import taskgraph.create
tgg = taskgraph.generator.TaskGraphGenerator(
root=options['root'],
log=self.log,
parameters=parameters,
optimization_finder=None) # XXX
# produce some artifacts
def write_artifact(filename, data):
@@ -188,17 +189,18 @@ class MachCommands(MachCommandBase):
# write out the full graph for reference
write_artifact('full-task-graph.json',
self.taskgraph_to_json(tgg.full_task_graph))
# write out the optimized task graph to describe what will happen
write_artifact('task-graph.json',
self.taskgraph_to_json(tgg.optimized_task_graph))
- # TODO: call the taskcluster API to create the tasks in the optimized graph
+ # actually create the graph
+ taskgraph.create.create_tasks(tgg.optimized_task_graph)
##
# Parameter handling
def load_parameters_file(self, options):
filename = options['parameters']
if not filename:
return {}
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/create.py
@@ -0,0 +1,40 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import requests
+import json
+import collections
+
+from slugid import nice as slugid
+
+def create_tasks(taskgraph):
+ # TODO: use the taskGroupId of the decision task
+ task_group_id = slugid()
+ label_to_taskid = collections.defaultdict(slugid)
+
+ for label in taskgraph.graph.visit_postorder():
+ task = taskgraph.tasks[label]
+ deps_by_name = {
+ n: label_to_taskid[r]
+ for (l, r, n) in taskgraph.graph.edges
+ if l == label}
+ task_def = task.kind.get_task_definition(task, deps_by_name)
+ task_def['taskGroupId'] = task_group_id
+ task_def['dependencies'] = deps_by_name.values()
+ task_def['requires'] = 'all-completed'
+
+ _create_task(label_to_taskid[label], task_def)
+
+def _create_task(task_id, task_def):
+ # create the task using 'http://taskcluster/queue', which is proxied to the queue service
+ # with credentials appropriate to this job.
+ print("Creating task {}".format(task_id))
+ res = requests.put('http://taskcluster/queue/v1/task/{}'.format(task_id), data=json.dumps(task_def))
+ if res.status_code != 200:
+ try:
+ print res.json()['message']
+ except:
+ print res.text
+ res.raise_for_status()
--- a/taskcluster/taskgraph/kind/base.py
+++ b/taskcluster/taskgraph/kind/base.py
@@ -47,8 +47,24 @@ class Kind(object):
task. If another task with the same optimization key has already been
performed, it will be used directly instead of executing the task
again.
Returns a string suitable for inclusion in a TaskCluster index
namespace (generally of the form `<optimizationName>.<hash>`), or None
if this task cannot be optimized.
"""
+
+ @abc.abstractmethod
+ def get_task_definition(self, task, dependent_taskids):
+ """
+ Get the final task definition for the given task. This is the time to
+ substitute actual taskIds for dependent tasks into the task definition.
+ Note that this method is only used in the decision tasks, so it should
+ not perform any processing that users might want to test or see in
+ other `mach taskgraph` commands.
+
+ The `dependent_taskids` parameter is a dictionary mapping dependency
+ name to assigned taskId.
+
+ The returned task definition will be modified before being passed to
+ `queue.createTask`.
+ """
--- a/taskcluster/taskgraph/kind/legacy.py
+++ b/taskcluster/taskgraph/kind/legacy.py
@@ -1,13 +1,14 @@
import time
import os
import sys
import json
import copy
+import re
import logging
from . import base
from taskgraph.types import Task
from functools import partial
from mozpack.path import match as mozpackmatch
@@ -31,23 +32,29 @@ from taskcluster_graph.image_builder imp
)
from taskcluster_graph.from_now import (
json_time_from_now,
current_json_time,
)
from taskcluster_graph.templates import Templates
import taskcluster_graph.build_task
+# TASKID_PLACEHOLDER is the "internal" form of a taskid; it is substituted with
+# actual taskIds at the very last minute, in get_task_definition
+TASKID_PLACEHOLDER = 'TaskLabel=={}'
+
ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
DEFINE_TASK = 'queue:define-task:aws-provisioner-v1/{}'
DEFAULT_TRY = 'try: -b do -p all -u all -t all'
DEFAULT_JOB_PATH = os.path.join(
'tasks', 'branches', 'base_jobs.yml'
)
+def mklabel():
+ return TASKID_PLACEHOLDER.format(slugid())
class LegacyKind(base.Kind):
def load_tasks(self, params):
root = os.path.abspath(os.path.join(self.path, self.config['legacy_path']))
project = params['project']
# NOTE: message is ignored here; we always use DEFAULT_TRY, then filter the
@@ -178,17 +185,17 @@ class LegacyKind(base.Kind):
all_routes = {}
for build in job_graph:
self.log(logging.DEBUG, 'load-task', {
'task': build['task'],
}, 'loading task {task}')
interactive = cmdline_interactive or build["interactive"]
build_parameters = merge_dicts(parameters, build['additional-parameters'])
- build_parameters['build_slugid'] = slugid()
+ build_parameters['build_slugid'] = mklabel()
build_parameters['source'] = '{repo}file/{rev}/testing/taskcluster/{file}'.format(repo=params['head_repository'], rev=params['head_rev'], file=build['task'])
build_task = templates.load(build['task'], build_parameters)
# Copy build_* attributes to expose them to post-build tasks
# as well as json routes and tests
task_extra = build_task['task']['extra']
build_parameters['build_name'] = task_extra['build_name']
build_parameters['build_type'] = task_extra['build_type']
@@ -276,17 +283,17 @@ class LegacyKind(base.Kind):
for post_build in build['post-build']:
# copy over the old parameters to update the template
# TODO additional-parameters is currently not an option, only
# enabled for build tasks
post_parameters = merge_dicts(build_parameters,
post_build.get('additional-parameters', {}))
post_task = configure_dependent_task(post_build['task'],
post_parameters,
- slugid(),
+ mklabel(),
templates,
build_treeherder_config)
normalize_image_details(graph,
post_task,
seen_images,
build_parameters,
os.environ.get('TASK_ID', None))
set_interactive_task(post_task, interactive)
@@ -326,17 +333,17 @@ class LegacyKind(base.Kind):
if 'only_chunks' in test and chunked and \
chunk not in test['only_chunks']:
continue
if chunked:
test_parameters['chunk'] = chunk
test_task = configure_dependent_task(test['task'],
test_parameters,
- slugid(),
+ mklabel(),
templates,
build_treeherder_config)
normalize_image_details(graph,
test_task,
seen_images,
build_parameters,
os.environ.get('TASK_ID', None))
set_interactive_task(test_task, interactive)
@@ -360,17 +367,17 @@ class LegacyKind(base.Kind):
if param in test_parameters:
test_task['attributes'][attr] = str(test_parameters[param])
# This will schedule test jobs N times
for i in range(0, trigger_tests):
graph['tasks'].append(test_task)
# If we're scheduling more tasks each have to be unique
test_task = copy.deepcopy(test_task)
- test_task['taskId'] = slugid()
+ test_task['taskId'] = mklabel()
define_task = DEFINE_TASK.format(
test_task['task']['workerType']
)
graph['scopes'].add(define_task)
graph['scopes'] |= set(test_task['task'].get('scopes', []))
@@ -385,12 +392,28 @@ class LegacyKind(base.Kind):
# taskId for each task, and we use those as the *labels* for the tasks;
# taskgraph will later assign them new taskIds.
return [Task(self, t['taskId'], task=t['task'], attributes=t['attributes'])
for t in self.graph['tasks']]
def get_task_dependencies(self, task, taskgraph):
# fetch dependency information from the cached graph
taskdict = self.tasks_by_label[task.label]
- return [(label, 'legacy') for label in taskdict.get('requires', [])]
+ return [(label, label) for label in taskdict.get('requires', [])]
def get_task_optimization_key(self, task, taskgraph):
pass
+
+ def get_task_definition(self, task, dependent_taskids):
+ # Note that the keys for `dependent_taskids` are task labels in this
+ # case, since that's how get_task_dependencies set it up.
+ placeholder_pattern = re.compile(r'TaskLabel==[a-zA-Z0-9-_]{22}')
+ def repl(mo):
+ return dependent_taskids[mo.group(0)]
+
+ # this is a cheap but easy way to replace all placeholders with
+ # actual real taskIds now that they are known. The placeholder
+ # may be embedded in a longer string, so traversing the data structure
+ # would still require regexp matching each string and not be
+ # appreciably faster.
+ task_def = json.dumps(task.task)
+ task_def = placeholder_pattern.sub(repl, task_def)
+ return json.loads(task_def)
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/test/test_create.py
@@ -0,0 +1,57 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import sys
+import unittest
+
+from taskgraph import create
+from taskgraph.graph import Graph
+from taskgraph.types import Task, TaskGraph
+
+from mozunit import main
+
+class FakeKind(object):
+
+ def get_task_definition(self, task, deps_by_name):
+ # sanity-check the deps_by_name
+ for k, v in deps_by_name.iteritems():
+ assert k == 'edge'
+ return {'payload': 'hello world'}
+
+
+class TestCreate(unittest.TestCase):
+
+ def setUp(self):
+ self.created_tasks = {}
+ self.old_create_task = create._create_task
+ create._create_task = self.fake_create_task
+
+ def tearDown(self):
+ create._create_task = self.old_create_task
+
+ def fake_create_task(self, task_id, task_def):
+ self.created_tasks[task_id] = task_def
+
+ def test_create_tasks(self):
+ kind = FakeKind()
+ tasks = {
+ 'a': Task(kind=kind, label='a'),
+ 'b': Task(kind=kind, label='b'),
+ }
+ graph = Graph(nodes=set('ab'), edges=set([('a', 'b', 'edge')]))
+ taskgraph = TaskGraph(tasks, graph)
+
+ create.create_tasks(taskgraph)
+
+ for tid, task in self.created_tasks.iteritems():
+ self.assertEqual(task['payload'], 'hello world')
+ # make sure the dependencies exist, at least
+ for depid in task['dependencies']:
+ self.assertIn(depid, self.created_tasks)
+
+
+if __name__ == '__main__':
+ main()
+