--- a/taskcluster/docs/actions.rst
+++ b/taskcluster/docs/actions.rst
@@ -35,17 +35,17 @@ a custom action task can be more efficie
Creating a Callback Action
--------------------------
A *callback action* is an action that calls back into in-tree logic. That is,
you register the action with name, title, description, context, input schema and a
python callback. When the action is triggered in a user interface,
input matching the schema is collected, passed to a new task which then calls
your python callback, enabling it to do pretty much anything it wants to.
-To create a new action you must create a file
+To create a new callback action you must create a file
``taskcluster/taskgraph/actions/my-action.py``, that at minimum contains::
from registry import register_callback_action
@register_callback_action(
name='hello',
title='Say Hello',
symbol='hw', # Show the callback task in treeherder as 'hw'
@@ -53,16 +53,30 @@ To create a new action you must create a
order=10000, # Order in which it should appear relative to other actions
)
def hello_world_action(parameters, input, task_group_id, task_id, task):
# parameters is an instance of taskgraph.parameters.Parameters
# it carries decision task parameters from the original decision task.
# input, task_id, and task should all be None
print "Hello was triggered from taskGroupId: " + taskGroupId
+Callback actions are configured in-tree to generate 3 artifacts when they run.
+These artifacts are similar to the artifacts generated by decision tasks since
+callback actions are basically mini decision tasks. The artifacts are:
+
+``task-graph.json``:
+ The graph of all tasks created by the action task. Includes tasks
+ created to satisfy requirements.
+``to-run.json``:
+ The set of tasks that the action task requested to build. This does not
+ include the requirements.
+``label-to-taskid.json``:
+ This is the mapping from label to ``taskid`` for all tasks involved in
+ the task-graph. This includes dependencies.
+
The example above defines an action that is available in the context-menu for
the entire task-group (result-set or push in Treeherder terminology). To create
an action that shows up in the context menu for a task we would specify the
``context`` parameter.
Setting the Action Context
..........................
--- a/taskcluster/taskgraph/actions/add_new_jobs.py
+++ b/taskcluster/taskgraph/actions/add_new_jobs.py
@@ -3,19 +3,17 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
from .registry import register_callback_action
-from .util import (create_tasks, find_decision_task)
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import (create_tasks, fetch_graph_and_labels)
@register_callback_action(
name='add-new-jobs',
title='Add new jobs',
symbol='add-new',
description="Add new jobs using task labels.",
order=10000,
@@ -29,21 +27,17 @@ from taskgraph.taskgraph import TaskGrap
'items': {
'type': 'string'
}
}
}
}
)
def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
- decision_task_id = find_decision_task(parameters)
-
- full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
- _, full_task_graph = TaskGraph.from_json(full_task_graph)
- label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
to_run = []
for elem in input['tasks']:
if elem in full_task_graph.tasks:
to_run.append(elem)
else:
raise Exception('{} was not found in the task-graph'.format(elem))
--- a/taskcluster/taskgraph/actions/add_talos.py
+++ b/taskcluster/taskgraph/actions/add_talos.py
@@ -4,19 +4,17 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
from .registry import register_callback_action
-from .util import create_tasks, find_decision_task
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import create_tasks, fetch_graph_and_labels
logger = logging.getLogger(__name__)
@register_callback_action(
name='run-all-talos',
title='Run All Talos Tests',
symbol='raT',
@@ -34,21 +32,17 @@ logger = logging.getLogger(__name__)
'title': 'Times',
'description': 'How many times to run each task.',
}
},
'additionalProperties': False
},
)
def add_all_talos(parameters, input, task_group_id, task_id, task):
- decision_task_id = find_decision_task(parameters)
-
- full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
- _, full_task_graph = TaskGraph.from_json(full_task_graph)
- label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
times = input.get('times', 1)
for i in xrange(times):
to_run = [label
for label, entry
in full_task_graph.tasks.iteritems() if 'talos_try_name' in entry.attributes]
create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
--- a/taskcluster/taskgraph/actions/mochitest_retrigger.py
+++ b/taskcluster/taskgraph/actions/mochitest_retrigger.py
@@ -6,21 +6,19 @@
from __future__ import absolute_import, print_function, unicode_literals
import json
import logging
from slugid import nice as slugid
-from .util import (find_decision_task, create_task_from_def)
+from .util import (create_task_from_def, fetch_graph_and_labels)
from .registry import register_callback_action
-from taskgraph.util.taskcluster import get_artifact
from taskgraph.util.parameterization import resolve_task_references
-from taskgraph.taskgraph import TaskGraph
TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
logger = logging.getLogger(__name__)
@register_callback_action(
name='retrigger-mochitest-reftest-with-options',
@@ -77,21 +75,17 @@ logger = logging.getLogger(__name__)
'additionalProperties': {'type': 'string'}
}
},
'additionalProperties': False,
'required': ['path']
}
)
def mochitest_retrigger_action(parameters, input, task_group_id, task_id, task):
- decision_task_id = find_decision_task(parameters)
-
- full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
- _, full_task_graph = TaskGraph.from_json(full_task_graph)
- label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
pre_task = full_task_graph.tasks[task['metadata']['name']]
# fix up the task's dependencies, similar to how optimization would
# have done in the decision
dependencies = {name: label_to_taskid[label]
for name, label in pre_task.dependencies.iteritems()}
new_task_definition = resolve_task_references(pre_task.label, pre_task.task, dependencies)
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -185,17 +185,17 @@ def register_callback_action(name, title
repo_scope = 'assume:repo:{}/{}:*'.format(
match.group(1), match.group(2))
task_group_id = os.environ.get('TASK_ID', slugid())
return {
'created': {'$fromNow': ''},
'deadline': {'$fromNow': '12 hours'},
- 'expires': {'$fromNow': '14 days'},
+ 'expires': {'$fromNow': '1 year'},
'metadata': {
'owner': 'mozilla-taskcluster-maintenance@mozilla.com',
'source': '{}raw-file/{}/{}'.format(
parameters['head_repository'], parameters['head_rev'], source_path,
),
'name': 'Action: {}'.format(title),
'description': 'Task executing callback for action.\n\n---\n' + description,
},
@@ -210,31 +210,40 @@ def register_callback_action(name, title
'createdForUser': parameters['owner'],
'kind': 'action-callback',
},
'routes': [
'tc-treeherder.v2.{}.{}.{}'.format(
parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
'tc-treeherder-stage.v2.{}.{}.{}'.format(
parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
+ 'index.gecko.v2.{}.pushlog-id.{}.actions.${{ownTaskId}}'.format(
+ parameters['project'], parameters['pushlog_id'])
],
'payload': {
'env': {
'GECKO_BASE_REPOSITORY': 'https://hg.mozilla.org/mozilla-unified',
'GECKO_HEAD_REPOSITORY': parameters['head_repository'],
'GECKO_HEAD_REF': parameters['head_ref'],
'GECKO_HEAD_REV': parameters['head_rev'],
'HG_STORE_PATH': '/builds/worker/checkouts/hg-store',
'ACTION_TASK_GROUP_ID': task_group_id,
'ACTION_TASK_ID': {'$json': {'$eval': 'taskId'}},
'ACTION_TASK': {'$json': {'$eval': 'task'}},
'ACTION_INPUT': {'$json': {'$eval': 'input'}},
'ACTION_CALLBACK': cb.__name__,
'ACTION_PARAMETERS': {'$json': {'$eval': 'parameters'}},
},
+ 'artifacts': {
+ 'public': {
+ 'type': 'directory',
+ 'path': '/builds/worker/artifacts',
+ 'expires': {'$fromNow': '1 year'},
+ },
+ },
'cache': {
'level-{}-checkouts'.format(parameters['level']):
'/builds/worker/checkouts',
},
'features': {
'taskclusterProxy': True,
'chainOfTrust': True,
},
--- a/taskcluster/taskgraph/actions/retrigger.py
+++ b/taskcluster/taskgraph/actions/retrigger.py
@@ -5,21 +5,19 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
from .util import (
create_tasks,
- find_decision_task
+ fetch_graph_and_labels
)
from .registry import register_callback_action
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
logger = logging.getLogger(__name__)
@register_callback_action(
title='Retrigger',
name='retrigger',
symbol='rt',
@@ -46,21 +44,17 @@ logger = logging.getLogger(__name__)
'maximum': 6,
'title': 'Times',
'description': 'How many times to run each task.',
}
}
}
)
def retrigger_action(parameters, input, task_group_id, task_id, task):
- decision_task_id = find_decision_task(parameters)
-
- full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
- _, full_task_graph = TaskGraph.from_json(full_task_graph)
- label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
label = task['metadata']['name']
with_downstream = ' '
to_run = [label]
if input.get('downstream'):
to_run = full_task_graph.graph.transitive_closure(set(to_run), reverse=True).nodes
to_run = to_run & set(label_to_taskid.keys())
--- a/taskcluster/taskgraph/actions/run_missing_tests.py
+++ b/taskcluster/taskgraph/actions/run_missing_tests.py
@@ -4,19 +4,18 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
from .registry import register_callback_action
-from .util import create_tasks, find_decision_task
+from .util import create_tasks, fetch_graph_and_labels
from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
logger = logging.getLogger(__name__)
@register_callback_action(
name='run-missing-tests',
title='Run Missing Tests',
symbol='rmt',
@@ -25,22 +24,18 @@ logger = logging.getLogger(__name__)
"\n"
"This action is for use on pushes that will be merged into another branch,"
"to check that optimization hasn't hidden any failures."
),
order=100, # Useful for sheriffs, but not top of the list
context=[], # Applies to decision task
)
def run_missing_tests(parameters, input, task_group_id, task_id, task):
- decision_task_id = find_decision_task(parameters)
-
- full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
- _, full_task_graph = TaskGraph.from_json(full_task_graph)
+ decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
target_tasks = get_artifact(decision_task_id, "public/target-tasks.json")
- label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
# The idea here is to schedule all tasks of the `test` kind that were
# targetted but did not appear in the final task-graph -- those were the
# optimized tasks.
to_run = []
already_run = 0
for label in target_tasks:
task = full_task_graph.tasks[label]
--- a/taskcluster/taskgraph/actions/util.py
+++ b/taskcluster/taskgraph/actions/util.py
@@ -1,30 +1,61 @@
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
+import logging
+
+from requests.exceptions import HTTPError
+
from taskgraph import create
+from taskgraph.decision import write_artifact
from taskgraph.taskgraph import TaskGraph
from taskgraph.optimize import optimize_task_graph
-from taskgraph.util.taskcluster import get_session, find_task_id
+from taskgraph.util.taskcluster import get_session, find_task_id, get_artifact, list_tasks
+
+logger = logging.getLogger(__name__)
def find_decision_task(parameters):
"""Given the parameters for this action, find the taskId of the decision
task"""
return find_task_id('gecko.v2.{}.pushlog-id.{}.decision'.format(
parameters['project'],
parameters['pushlog_id']))
+def fetch_graph_and_labels(parameters):
+ decision_task_id = find_decision_task(parameters)
+
+ # First grab the graph and labels generated during the initial decision task
+ full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
+ _, full_task_graph = TaskGraph.from_json(full_task_graph)
+ label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+ # Now fetch any modifications made by action tasks and swap out new tasks
+ # for old ones
+ namespace = 'gecko.v2.{}.pushlog-id.{}.actions'.format(
+ parameters['project'],
+ parameters['pushlog_id'])
+ for action in list_tasks(namespace):
+ try:
+ run_label_to_id = get_artifact(action, "public/label-to-taskid.json")
+ label_to_taskid.update(run_label_to_id)
+ except HTTPError as e:
+ logger.info('Skipping {} due to missing artifact! Error: {}'.format(action, e))
+ continue
+
+ return (decision_task_id, full_task_graph, label_to_taskid)
+
+
def create_task_from_def(task_id, task_def, level):
"""Create a new task from a definition rather than from a label
that is already in the full-task-graph. The task definition will
have {relative-datestamp': '..'} rendered just like in a decision task.
Use this for entirely new tasks or ones that change internals of the task.
It is useful if you want to "edit" the full_task_graph and then hand
it to this function. No dependencies will be scheduled. You must handle
this yourself. Seeing how create_tasks handles it might prove helpful."""
@@ -44,9 +75,12 @@ def create_tasks(to_run, full_task_graph
target_graph = full_task_graph.graph.transitive_closure(to_run)
target_task_graph = TaskGraph(
{l: full_task_graph[l] for l in target_graph.nodes},
target_graph)
optimized_task_graph, label_to_taskid = optimize_task_graph(target_task_graph,
params,
to_run,
label_to_taskid)
+ write_artifact('task-graph.json', optimized_task_graph.to_json())
+ write_artifact('label-to-taskid.json', label_to_taskid)
+ write_artifact('to-run.json', list(to_run))
create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -1,16 +1,17 @@
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
+import datetime
import functools
import yaml
import requests
from mozbuild.util import memoize
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
_TC_ARTIFACT_LOCATION = \
@@ -22,19 +23,22 @@ def get_session():
session = requests.Session()
retry = Retry(total=5, backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retry))
session.mount('https://', HTTPAdapter(max_retries=retry))
return session
-def _do_request(url):
+def _do_request(url, content=None):
session = get_session()
- response = session.get(url, stream=True)
+ if content is None:
+ response = session.get(url, stream=True)
+ else:
+ response = session.post(url, json=content)
if response.status_code >= 400:
# Consume content before raise_for_status, so that the connection can be
# reused.
response.content
response.raise_for_status()
return response
@@ -69,22 +73,22 @@ def get_artifact(task_id, path, use_prox
return _handle_artifact(path, response)
def list_artifacts(task_id, use_proxy=False):
response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
return response.json()['artifacts']
-def get_index_url(index_path, use_proxy=False):
+def get_index_url(index_path, use_proxy=False, multiple=False):
if use_proxy:
- INDEX_URL = 'http://taskcluster/index/v1/task/{}'
+ INDEX_URL = 'http://taskcluster/index/v1/task{}/{}'
else:
- INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
- return INDEX_URL.format(index_path)
+ INDEX_URL = 'https://index.taskcluster.net/v1/task{}/{}'
+ return INDEX_URL.format('s' if multiple else '', index_path)
def find_task_id(index_path, use_proxy=False):
try:
response = _do_request(get_index_url(index_path, use_proxy))
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise KeyError("index path {} not found".format(index_path))
@@ -93,16 +97,40 @@ def find_task_id(index_path, use_proxy=F
def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
full_path = index_path + '/artifacts/' + artifact_path
response = _do_request(get_index_url(full_path, use_proxy))
return _handle_artifact(full_path, response)
+def list_tasks(index_path, use_proxy=False):
+ """
+ Returns a list of task_ids where each task_id is indexed under a path
+ in the index. Results are sorted by expiration date from oldest to newest.
+ """
+ results = []
+ data = {}
+ while True:
+ response = _do_request(get_index_url(index_path, use_proxy, multiple=True), data)
+ response = response.json()
+ results += response['tasks']
+ if response.get('continuationToken'):
+ data = {'continuationToken': response.get('continuationToken')}
+ else:
+ break
+
+ # We can sort on expires because in the general case
+ # all of these tasks should be created with the same expires time so they end up in
+ # order from earliest to latest action. If more correctness is needed, consider
+ # fetching each task and sorting on the created date.
+ results.sort(key=lambda t: datetime.datetime.strptime(t['expires'], '%Y-%m-%dT%H:%M:%S.%fZ'))
+ return [t['taskId'] for t in results]
+
+
def get_task_url(task_id, use_proxy=False):
if use_proxy:
TASK_URL = 'http://taskcluster/queue/v1/task/{}'
else:
TASK_URL = 'https://queue.taskcluster.net/v1/task/{}'
return TASK_URL.format(task_id)