Bug 1388407 - Fix timeouts in action-task graph submission
MozReview-Commit-ID: 5hoFSaiYEXD
--- a/taskcluster/taskgraph/actions/add_new_jobs.py
+++ b/taskcluster/taskgraph/actions/add_new_jobs.py
@@ -2,21 +2,19 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
from .registry import register_callback_action
-from slugid import nice as slugid
-from .util import (create_task, find_decision_task)
+from .util import (create_tasks, find_decision_task)
from taskgraph.util.taskcluster import get_artifact
-from taskgraph.util.parameterization import resolve_task_references
from taskgraph.taskgraph import TaskGraph
@register_callback_action(
name='add-new-jobs',
title='Add new jobs',
symbol='add-new',
description="Add new jobs using task labels.",
@@ -37,22 +35,16 @@ from taskgraph.taskgraph import TaskGrap
)
def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
decision_task_id = find_decision_task(parameters)
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+ to_run = []
for elem in input['tasks']:
if elem in full_task_graph.tasks:
- task = full_task_graph.tasks[elem]
-
- # fix up the task's dependencies, similar to how optimization would
- # have done in the decision
- dependencies = {name: label_to_taskid[label]
- for name, label in task.dependencies.iteritems()}
- task_def = resolve_task_references(task.label, task.task, dependencies)
- task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
- # actually create the new task
- create_task(slugid(), task_def, parameters['level'])
+ to_run.append(elem)
else:
raise Exception('{} was not found in the task-graph'.format(elem))
+
+ create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
--- a/taskcluster/taskgraph/actions/backfill.py
+++ b/taskcluster/taskgraph/actions/backfill.py
@@ -4,25 +4,23 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
import requests
-from slugid import nice as slugid
from .registry import register_callback_action
-from .util import create_task
+from .util import find_decision_task, create_tasks
from taskgraph.util.taskcluster import get_artifact_from_index
-from taskgraph.util.parameterization import resolve_task_references
from taskgraph.taskgraph import TaskGraph
-PUSHLOG_TMPL = '{}json-pushes?version=2&startID={}&endID={}'
+PUSHLOG_TMPL = '{}/json-pushes?version=2&startID={}&endID={}'
INDEX_TMPL = 'gecko.v2.{}.pushlog-id.{}.decision'
logger = logging.getLogger(__name__)
@register_callback_action(
title='Backfill',
name='backfill',
@@ -74,18 +72,18 @@ def backfill_action(parameters, input, t
for push in pushes:
full_task_graph = get_artifact_from_index(
INDEX_TMPL.format(parameters['project'], push),
'public/full-task-graph.json')
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact_from_index(
INDEX_TMPL.format(parameters['project'], push),
'public/label-to-taskid.json')
+ push_params = get_artifact_from_index(
+ INDEX_TMPL.format(parameters['project'], push),
+ 'public/parameters.yml')
+ push_decision_task_id = find_decision_task(push_params)
if label in full_task_graph.tasks.keys():
- task = full_task_graph.tasks[label]
- dependencies = {name: label_to_taskid[label]
- for name, label in task.dependencies.iteritems()}
- task_def = resolve_task_references(task.label, task.task, dependencies)
- task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
- create_task(slugid(), task_def, parameters['level'])
+ create_tasks(
+ [label], full_task_graph, label_to_taskid, push_params, push_decision_task_id)
else:
logging.info('Could not find {} on {}. Skipping.'.format(label, push))
--- a/taskcluster/taskgraph/actions/mochitest_retrigger.py
+++ b/taskcluster/taskgraph/actions/mochitest_retrigger.py
@@ -6,17 +6,17 @@
from __future__ import absolute_import, print_function, unicode_literals
import json
import logging
from slugid import nice as slugid
-from .util import (find_decision_task, create_task)
+from .util import (find_decision_task, create_task_from_def)
from .registry import register_callback_action
from taskgraph.util.taskcluster import get_artifact
from taskgraph.util.parameterization import resolve_task_references
from taskgraph.taskgraph import TaskGraph
TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
logger = logging.getLogger(__name__)
@@ -135,9 +135,9 @@ def mochitest_retrigger_action(parameter
# tweak the treeherder symbol
new_task_definition['extra']['treeherder']['symbol'] += '-custom'
logging.info("New task definition: %s", new_task_definition)
# actually create the new task
new_task_id = slugid()
- create_task(new_task_id, new_task_definition, parameters['level'])
+ create_task_from_def(new_task_id, new_task_definition, parameters['level'])
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -8,19 +8,19 @@ from __future__ import absolute_import,
import json
import os
import inspect
import re
from mozbuild.util import memoize
from types import FunctionType
from collections import namedtuple
+from taskgraph import create
from taskgraph.util.docker import docker_image
from taskgraph.parameters import Parameters
-from . import util
GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
actions = []
callbacks = {}
Action = namedtuple('Action', [
@@ -304,17 +304,17 @@ def trigger_action_callback(task_group_i
the action callback in testing mode, without actually creating tasks.
"""
cb = get_callbacks().get(callback, None)
if not cb:
raise Exception('Unknown callback: {}. Known callbacks: {}'.format(
callback, get_callbacks().keys()))
if test:
- util.testing = True
+ create.testing = True
cb(Parameters(**parameters), input, task_group_id, task_id, task)
@memoize
def _load():
# Load all modules from this folder, relying on the side-effects of register_
# functions to populate the action registry.
--- a/taskcluster/taskgraph/actions/run_missing_tests.py
+++ b/taskcluster/taskgraph/actions/run_missing_tests.py
@@ -2,22 +2,20 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
-from slugid import nice as slugid
from .registry import register_callback_action
-from .util import create_task, find_decision_task
+from .util import create_tasks, find_decision_task
from taskgraph.util.taskcluster import get_artifact
-from taskgraph.util.parameterization import resolve_task_references
from taskgraph.taskgraph import TaskGraph
logger = logging.getLogger(__name__)
@register_callback_action(
name='run-missing-tests',
title='Run Missing Tests',
@@ -46,22 +44,14 @@ def run_missing_tests(parameters, input,
already_run = 0
for label in target_tasks:
task = full_task_graph.tasks[label]
if task.kind != 'test':
continue # not a test
if label in label_to_taskid:
already_run += 1
continue
- to_run.append(task)
-
- for task in to_run:
+ to_run.append(label)
- # fix up the task's dependencies, similar to how optimization would
- # have done in the decision
- dependencies = {name: label_to_taskid[label]
- for name, label in task.dependencies.iteritems()}
- task_def = resolve_task_references(task.label, task.task, dependencies)
- task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
- create_task(slugid(), task_def, parameters['level'])
+ create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
logger.info('Out of {} test tasks, {} already existed and the action created {}'.format(
already_run + len(to_run), already_run, len(to_run)))
--- a/taskcluster/taskgraph/actions/util.py
+++ b/taskcluster/taskgraph/actions/util.py
@@ -1,39 +1,52 @@
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
-import json
-import sys
-
from taskgraph import create
+from taskgraph.taskgraph import TaskGraph
+from taskgraph.optimize import optimize_task_graph
from taskgraph.util.taskcluster import get_session, find_task_id
-# this is set to true for `mach taskgraph action-callback --test`
-testing = False
-
def find_decision_task(parameters):
"""Given the parameters for this action, find the taskId of the decision
task"""
return find_task_id('gecko.v2.{}.pushlog-id.{}.decision'.format(
parameters['project'],
parameters['pushlog_id']))
-def create_task(task_id, task_def, level):
- """Create a new task. The task definition will have {relative-datestamp':
- '..'} rendered just like in a decision task. Action callbacks should use
- this function to create new tasks, as it has the additional advantage of
- allowing easy debugging with `mach taskgraph action-callback --test`."""
+def create_task_from_def(task_id, task_def, level):
+ """Create a new task from a definition rather than from a label
+ that is already in the full-task-graph. The task definition will
+ have {relative-datestamp': '..'} rendered just like in a decision task.
+ Use this for entirely new tasks or ones that change internals of the task.
+ It is useful if you want to "edit" the full_task_graph and then hand
+ it to this function. No dependencies will be scheduled. You must handle
+ this yourself. Seeing how create_tasks handles it might prove helpful."""
task_def['schedulerId'] = 'gecko-level-{}'.format(level)
- if testing:
- json.dump([task_id, task_def], sys.stdout,
- sort_keys=True, indent=4, separators=(',', ': '))
- return
label = task_def['metadata']['name']
session = get_session()
create.create_task(session, task_id, label, task_def)
+
+
+def create_tasks(to_run, full_task_graph, label_to_taskid, params, decision_task_id):
+ """Create new tasks. The task definition will have {relative-datestamp':
+ '..'} rendered just like in a decision task. Action callbacks should use
+ this function to create new tasks,
+ allowing easy debugging with `mach taskgraph action-callback --test`.
+ This builds up all required tasks to run in order to run the tasks requested."""
+ to_run = set(to_run)
+ target_graph = full_task_graph.graph.transitive_closure(to_run)
+ target_task_graph = TaskGraph(
+ {l: full_task_graph[l] for l in target_graph.nodes},
+ target_graph)
+ optimized_task_graph, label_to_taskid = optimize_task_graph(target_task_graph,
+ params,
+ to_run,
+ label_to_taskid)
+ create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -4,42 +4,46 @@
from __future__ import absolute_import, print_function, unicode_literals
import concurrent.futures as futures
import requests
import requests.adapters
import json
import os
+import sys
import logging
from slugid import nice as slugid
from taskgraph.util.parameterization import resolve_timestamps
from taskgraph.util.time import current_json_time
logger = logging.getLogger(__name__)
# the maximum number of parallel createTask calls to make
CONCURRENCY = 50
+# this is set to true for `mach taskgraph action-callback --test`
+testing = False
-def create_tasks(taskgraph, label_to_taskid, params):
+
+def create_tasks(taskgraph, label_to_taskid, params, decision_task_id=None):
taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}
session = requests.Session()
# Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
# that limit. Connections are established as needed, so using a large value
# should not negatively impact performance.
http_adapter = requests.adapters.HTTPAdapter(pool_connections=CONCURRENCY,
pool_maxsize=CONCURRENCY)
session.mount('https://', http_adapter)
session.mount('http://', http_adapter)
- decision_task_id = os.environ.get('TASK_ID')
+ decision_task_id = decision_task_id or os.environ.get('TASK_ID')
# when running as an actual decision task, we use the decision task's
# taskId as the taskGroupId. The process that created the decision task
# helpfully placed it in this same taskGroup. If there is no $TASK_ID,
# fall back to a slugid
task_group_id = decision_task_id or slugid()
scheduler_id = 'gecko-level-{}'.format(params['level'])
@@ -94,16 +98,21 @@ def create_tasks(taskgraph, label_to_tas
def create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this job.
# Resolve timestamps
now = current_json_time(datetime_format=True)
task_def = resolve_timestamps(now, task_def)
+ if testing:
+ json.dump([task_id, task_def], sys.stdout,
+ sort_keys=True, indent=4, separators=(',', ': '))
+ return
+
logger.debug("Creating task with taskId {} for {}".format(task_id, label))
res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
data=json.dumps(task_def))
if res.status_code != 200:
try:
logger.error(res.json()['message'])
except:
logger.error(res.text)