Bug 1278406 - Use a thread pool for submitting tasks to queue; r?dustin
Currently, Gecko decision tasks spend ~8 minutes making HTTP requests to the
queue API. Let's throw a thread pool at it so tasks are submitted faster.
This could possibly be faster if there were a single batch submit API in
the queue. However, that isn't implemented for technical reasons described
in the bug.
MozReview-Commit-ID: MeHItEVBbk
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -1,15 +1,17 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
+import concurrent.futures as futures
import requests
+import requests.adapters
import json
import collections
import os
import logging
from slugid import nice as slugid
logger = logging.getLogger(__name__)
@@ -18,27 +20,50 @@ def create_tasks(taskgraph, label_to_tas
# TODO: use the taskGroupId of the decision task
task_group_id = slugid()
taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}
session = requests.Session()
decision_task_id = os.environ.get('TASK_ID')
- for task_id in taskgraph.graph.visit_postorder():
- task_def = taskgraph.tasks[task_id].task
+ with futures.ThreadPoolExecutor(requests.adapters.DEFAULT_POOLSIZE) as e:
+ fs = {}
+
+ # We can't submit a task until its dependencies have been submitted.
+ # So our strategy is to walk the graph and submit tasks once all
+ # their dependencies have been submitted.
+ #
+ # Using visit_postorder() here isn't the most efficient: we'll
+ # block waiting for dependencies of task N to submit even though
+ # dependencies for task N+1 may be finished. If we need to optimize
+ # this further, we can build a graph of task dependencies and walk
+ # that.
+ for task_id in taskgraph.graph.visit_postorder():
+ task_def = taskgraph.tasks[task_id].task
- # if this task has no dependencies, make it depend on this decision
- # task so that it does not start immediately; and so that if this loop
- # fails halfway through, none of the already-created tasks run.
- if decision_task_id and not task_def.get('dependencies'):
- task_def['dependencies'] = [decision_task_id]
+ # if this task has no dependencies, make it depend on this decision
+ # task so that it does not start immediately; and so that if this loop
+ # fails halfway through, none of the already-created tasks run.
+ if decision_task_id and not task_def.get('dependencies'):
+ task_def['dependencies'] = [decision_task_id]
+
+ task_def['taskGroupId'] = task_group_id
- task_def['taskGroupId'] = task_group_id
- _create_task(session, task_id, taskid_to_label[task_id], task_def)
+ # Wait for dependencies before submitting this.
+ deps_fs = [fs[dep] for dep in task_def['dependencies'] if dep in fs]
+ for f in futures.as_completed(deps_fs):
+ f.result()
+
+ fs[task_id] = e.submit(_create_task, session, task_id,
+ taskid_to_label[task_id], task_def)
+
+ # Wait for all futures to complete.
+ for f in futures.as_completed(fs.values()):
+ f.result()
def _create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this job.
logger.debug("Creating task with taskId {} for {}".format(task_id, label))
res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id), data=json.dumps(task_def))
if res.status_code != 200:
try: