Bug 1407452 - fix concurrent task creation to handle exceptions; r?jonasfj draft
authorDustin J. Mitchell <dustin@mozilla.com>
Wed, 11 Oct 2017 15:27:12 +0000
changeset 678566 c0d599653ffaf785dc0d543f34b5d732444a6dd8
parent 674178 11fe0a2895aab26c57bcfe61b3041d7837e954cd
child 735366 908e46955b160ebeb5af79259ba9d597f9be758d
push id83965
push userdmitchell@mozilla.com
push dateWed, 11 Oct 2017 15:49:58 +0000
reviewersjonasfj
bugs1407452
milestone58.0a1
Bug 1407452 - fix concurrent task creation to handle exceptions; r?jonasfj The previous implementation failed to call `f.result()` when creating only one task, thereby ignoring the error. MozReview-Commit-ID: 3zv9kFoPZCj
taskcluster/taskgraph/create.py
taskcluster/taskgraph/test/test_create.py
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -68,45 +68,59 @@ def create_tasks(taskgraph, label_to_tas
         fs = {}
 
         # We can't submit a task until its dependencies have been submitted.
         # So our strategy is to walk the graph and submit tasks once all
         # their dependencies have been submitted.
         tasklist = set(taskgraph.graph.visit_postorder())
         alltasks = tasklist.copy()
 
-        def schedule_tasks(f=None):
+        def schedule_tasks():
+            # bail out early if any futures have failed
+            if any(f.done() and f.exception() for f in fs.values()):
+                return
+
             to_remove = set()
+            new = set()
+
+            def submit(task_id, label, task_def):
+                fut = e.submit(create_task, session, task_id, label, task_def)
+                new.add(fut)
+                fs[task_id] = fut
+
             for task_id in tasklist:
                 task_def = taskgraph.tasks[task_id].task
                 # If we haven't finished submitting all our dependencies yet,
                 # come back to this later.
                 # Some dependencies aren't in our graph, so make sure to filter
                 # those out
                 deps = set(task_def.get('dependencies', [])) & alltasks
                 if any((d not in fs or not fs[d].done()) for d in deps):
                     continue
 
-                fs[task_id] = e.submit(create_task, session, task_id,
-                                       taskid_to_label[task_id], task_def)
+                submit(task_id, taskid_to_label[task_id], task_def)
                 to_remove.add(task_id)
 
                 # Schedule tasks as many times as task_duplicates indicates
                 attributes = taskgraph.tasks[task_id].attributes
                 for i in range(1, attributes.get('task_duplicates', 1)):
                     # We use slugid() since we want a distinct task id
-                    fs[task_id] = e.submit(create_task, session, slugid(),
-                                           taskid_to_label[task_id], task_def)
+                    submit(slugid(), taskid_to_label[task_id], task_def)
             tasklist.difference_update(to_remove)
 
+            # as each of those futures complete, try to schedule more tasks
+            for f in futures.as_completed(new):
+                schedule_tasks()
+
+        # start scheduling tasks and run until everything is scheduled
         schedule_tasks()
-        while tasklist:
-            for f in futures.as_completed(fs.values()):
-                f.result()
-            schedule_tasks()
+
+        # check the result of each future, raising an exception if it failed
+        for f in futures.as_completed(fs.values()):
+            f.result()
 
 
 def create_task(session, task_id, label, task_def):
     # create the task using 'http://taskcluster/queue', which is proxied to the queue service
     # with credentials appropriate to this job.
 
     # Resolve timestamps
     now = current_json_time(datetime_format=True)
--- a/taskcluster/taskgraph/test/test_create.py
+++ b/taskcluster/taskgraph/test/test_create.py
@@ -1,16 +1,17 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import unittest
 import os
+import mock
 
 from taskgraph import create
 from taskgraph.graph import Graph
 from taskgraph.taskgraph import TaskGraph
 from taskgraph.task import Task
 
 from mozunit import main
 
@@ -66,11 +67,30 @@ class TestCreate(unittest.TestCase):
         graph = Graph(nodes={'tid-a'}, edges=set())
         taskgraph = TaskGraph(tasks, graph)
 
         create.create_tasks(taskgraph, label_to_taskid, {'level': '4'})
 
         for tid, task in self.created_tasks.iteritems():
             self.assertEqual(task.get('dependencies'), [os.environ['TASK_ID']])
 
+    @mock.patch('taskgraph.create.create_task')
+    def test_create_tasks_fails_if_create_fails(self, create_task):
+        "creat_tasks fails if a single create_task call fails"
+        os.environ['TASK_ID'] = 'decisiontask'
+        tasks = {
+            'tid-a': Task(kind='test', label='a', attributes={}, task={'payload': 'hello world'}),
+        }
+        label_to_taskid = {'a': 'tid-a'}
+        graph = Graph(nodes={'tid-a'}, edges=set())
+        taskgraph = TaskGraph(tasks, graph)
+
+        def fail(*args):
+            print("UHOH")
+            raise RuntimeError('oh noes!')
+        create_task.side_effect = fail
+
+        with self.assertRaises(RuntimeError):
+            create.create_tasks(taskgraph, label_to_taskid, {'level': '4'})
+
 
 if __name__ == '__main__':
     main()