Bug 1381669 - Add backfill as an actions.json task
MozReview-Commit-ID: 3Q53x0KGWgG
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/actions/backfill.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import logging
+
+import requests
+from slugid import nice as slugid
+
+from .registry import register_callback_action
+from .util import create_task
+from taskgraph.util.taskcluster import get_artifact_from_index
+from taskgraph.util.parameterization import resolve_task_references
+from taskgraph.taskgraph import TaskGraph
+
+PUSHLOG_TMPL = '{}json-pushes?version=2&startID={}&endID={}'
+INDEX_TMPL = 'gecko.v2.{}.pushlog-id.{}.decision'
+
+logger = logging.getLogger(__name__)
+
+
+@register_callback_action(
+ title='Backfill',
+ name='backfill',
+ symbol='Bk',
+ description=('Take the label of the current task, '
+ 'and trigger the task with that label '
+ 'on previous pushes in the same project.'),
+ order=0,
+ context=[{}], # This will be available for all tasks
+ schema={
+ 'type': 'object',
+ 'properties': {
+ 'depth': {
+ 'type': 'integer',
+ 'default': 5,
+ 'minimum': 1,
+ 'maximum': 10,
+ 'title': 'Depth',
+ 'description': ('The number of previous pushes before the current '
+ 'push to attempt to trigger this task on.')
+ }
+ },
+ 'additionalProperties': False
+ },
+ available=lambda parameters: parameters.get('project', None) != 'try'
+)
+def backfill_action(parameters, input, task_group_id, task_id, task):
+ label = task['metadata']['name']
+ pushes = []
+ depth = input.get('depth', 5)
+ end_id = int(parameters['pushlog_id']) - 1
+
+ while True:
+ start_id = max(end_id - depth, 0)
+ pushlog_url = PUSHLOG_TMPL.format(parameters['head_repository'], start_id, end_id)
+ r = requests.get(pushlog_url)
+ r.raise_for_status()
+ pushes = pushes + r.json()['pushes'].keys()
+ if len(pushes) >= depth:
+ break
+
+ end_id = start_id - 1
+ start_id -= depth
+ if start_id < 0:
+ break
+
+ pushes = sorted(pushes)[-depth:]
+
+ for push in pushes:
+ full_task_graph = get_artifact_from_index(
+ INDEX_TMPL.format(parameters['project'], push),
+ 'public/full-task-graph.json')
+ _, full_task_graph = TaskGraph.from_json(full_task_graph)
+ label_to_taskid = get_artifact_from_index(
+ INDEX_TMPL.format(parameters['project'], push),
+ 'public/label-to-taskid.json')
+
+ if label in full_task_graph.tasks.keys():
+ task = full_task_graph.tasks[label]
+ dependencies = {name: label_to_taskid[label]
+ for name, label in task.dependencies.iteritems()}
+ task_def = resolve_task_references(task.label, task.task, dependencies)
+ task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
+ task_def['schedulerId'] = 'gecko-level-{}'.format(parameters['level'])
+ create_task(slugid(), task_def)
+ else:
+ logging.info('Could not find {} on {}. Skipping.'.format(label, push))
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -30,16 +30,26 @@ def _do_request(url):
if response.status_code >= 400:
# Consume content before raise_for_status, so that the connection can be
# reused.
response.content
response.raise_for_status()
return response
+def _handle_artifact(path, response):
+ if path.endswith('.json'):
+ return response.json()
+ if path.endswith('.yml'):
+ return yaml.load(response.text)
+ response.raw.read = functools.partial(response.raw.read,
+ decode_content=True)
+ return response.raw
+
+
def get_artifact_url(task_id, path, use_proxy=False):
if use_proxy:
ARTIFACT_URL = 'http://taskcluster/queue/v1/task/{}/artifacts/{}'
else:
ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
return ARTIFACT_URL.format(task_id, path)
@@ -48,23 +58,17 @@ def get_artifact(task_id, path, use_prox
Returns the artifact with the given path for the given task id.
If the path ends with ".json" or ".yml", the content is deserialized as,
respectively, json or yaml, and the corresponding python data (usually
dict) is returned.
For other types of content, a file-like object is returned.
"""
response = _do_request(get_artifact_url(task_id, path, use_proxy))
- if path.endswith('.json'):
- return response.json()
- if path.endswith('.yml'):
- return yaml.load(response.text)
- response.raw.read = functools.partial(response.raw.read,
- decode_content=True)
- return response.raw
+ return _handle_artifact(path, response)
def list_artifacts(task_id, use_proxy=False):
response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
return response.json()['artifacts']
def get_index_url(index_path, use_proxy=False):
@@ -75,16 +79,22 @@ def get_index_url(index_path, use_proxy=
return INDEX_URL.format(index_path)
def find_task_id(index_path, use_proxy=False):
response = _do_request(get_index_url(index_path, use_proxy))
return response.json()['taskId']
+def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
+ full_path = index_path + '/artifacts/' + artifact_path
+ response = _do_request(get_index_url(full_path, use_proxy))
+ return _handle_artifact(full_path, response)
+
+
def get_task_url(task_id, use_proxy=False):
if use_proxy:
TASK_URL = 'http://taskcluster/queue/v1/task/{}'
else:
TASK_URL = 'https://queue.taskcluster.net/v1/task/{}'
return TASK_URL.format(task_id)