Bug 1407452 - fix concurrent task creation to handle exceptions; r?jonasfj
The previous implementation failed to call `f.result()` when creating only one
task, thereby ignoring the error.
MozReview-Commit-ID: 3zv9kFoPZCj
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -68,45 +68,59 @@ def create_tasks(taskgraph, label_to_tas
fs = {}
# We can't submit a task until its dependencies have been submitted.
# So our strategy is to walk the graph and submit tasks once all
# their dependencies have been submitted.
tasklist = set(taskgraph.graph.visit_postorder())
alltasks = tasklist.copy()
- def schedule_tasks(f=None):
+ def schedule_tasks():
+ # bail out early if any futures have failed
+ if any(f.done() and f.exception() for f in fs.values()):
+ return
+
to_remove = set()
+ new = set()
+
+ def submit(task_id, label, task_def):
+ fut = e.submit(create_task, session, task_id, label, task_def)
+ new.add(fut)
+ fs[task_id] = fut
+
for task_id in tasklist:
task_def = taskgraph.tasks[task_id].task
# If we haven't finished submitting all our dependencies yet,
# come back to this later.
# Some dependencies aren't in our graph, so make sure to filter
# those out
deps = set(task_def.get('dependencies', [])) & alltasks
if any((d not in fs or not fs[d].done()) for d in deps):
continue
- fs[task_id] = e.submit(create_task, session, task_id,
- taskid_to_label[task_id], task_def)
+ submit(task_id, taskid_to_label[task_id], task_def)
to_remove.add(task_id)
# Schedule tasks as many times as task_duplicates indicates
attributes = taskgraph.tasks[task_id].attributes
for i in range(1, attributes.get('task_duplicates', 1)):
# We use slugid() since we want a distinct task id
- fs[task_id] = e.submit(create_task, session, slugid(),
- taskid_to_label[task_id], task_def)
+ submit(slugid(), taskid_to_label[task_id], task_def)
tasklist.difference_update(to_remove)
+ # as each of those futures complete, try to schedule more tasks
+ for f in futures.as_completed(new):
+ schedule_tasks()
+
+ # start scheduling tasks and run until everything is scheduled
schedule_tasks()
- while tasklist:
- for f in futures.as_completed(fs.values()):
- f.result()
- schedule_tasks()
+
+ # check the result of each future, raising an exception if it failed
+ for f in futures.as_completed(fs.values()):
+ f.result()
def create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this job.
# Resolve timestamps
now = current_json_time(datetime_format=True)
--- a/taskcluster/taskgraph/test/test_create.py
+++ b/taskcluster/taskgraph/test/test_create.py
@@ -1,16 +1,17 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import unittest
import os
+import mock
from taskgraph import create
from taskgraph.graph import Graph
from taskgraph.taskgraph import TaskGraph
from taskgraph.task import Task
from mozunit import main
@@ -66,11 +67,30 @@ class TestCreate(unittest.TestCase):
graph = Graph(nodes={'tid-a'}, edges=set())
taskgraph = TaskGraph(tasks, graph)
create.create_tasks(taskgraph, label_to_taskid, {'level': '4'})
for tid, task in self.created_tasks.iteritems():
self.assertEqual(task.get('dependencies'), [os.environ['TASK_ID']])
+ @mock.patch('taskgraph.create.create_task')
+ def test_create_tasks_fails_if_create_fails(self, create_task):
+ "creat_tasks fails if a single create_task call fails"
+ os.environ['TASK_ID'] = 'decisiontask'
+ tasks = {
+ 'tid-a': Task(kind='test', label='a', attributes={}, task={'payload': 'hello world'}),
+ }
+ label_to_taskid = {'a': 'tid-a'}
+ graph = Graph(nodes={'tid-a'}, edges=set())
+ taskgraph = TaskGraph(tasks, graph)
+
+ def fail(*args):
+ print("UHOH")
+ raise RuntimeError('oh noes!')
+ create_task.side_effect = fail
+
+ with self.assertRaises(RuntimeError):
+ create.create_tasks(taskgraph, label_to_taskid, {'level': '4'})
+
if __name__ == '__main__':
main()