Bug 1381669 - Add backfill as an actions.json task draft
authorBrian Stack <bstack@mozilla.com>
Thu, 27 Jul 2017 15:27:50 -0700
changeset 617174 92b4d95dc535abad26ba25d18e1047fd85aae80a
parent 617156 eccdd37590f904017d667a9c06218bc9765fc20d
child 617179 e6ad4ffebd7ea0c880ad43db8d6a3f64b20cc644
push id70962
push userbstack@mozilla.com
push dateFri, 28 Jul 2017 02:50:56 +0000
bugs1381669
milestone56.0a1
Bug 1381669 - Add backfill as an actions.json task MozReview-Commit-ID: 3Q53x0KGWgG
taskcluster/taskgraph/actions/backfill.py
taskcluster/taskgraph/util/taskcluster.py
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/actions/backfill.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import logging
+
+import requests
+from slugid import nice as slugid
+
+from .registry import register_callback_action
+from .util import create_task
+from taskgraph.util.taskcluster import get_artifact_from_index
+from taskgraph.util.parameterization import resolve_task_references
+from taskgraph.taskgraph import TaskGraph
+
+PUSHLOG_TMPL = '{}json-pushes?version=2&startID={}&endID={}'
+INDEX_TMPL = 'gecko.v2.{}.pushlog-id.{}.decision'
+
+logger = logging.getLogger(__name__)
+
+
+@register_callback_action(
+    title='Backfill',
+    name='backfill',
+    symbol='Bk',
+    description=('Take the label of the current task, '
+                 'and trigger the task with that label '
+                 'on previous pushes in the same project.'),
+    order=0,
+    context=[{}],  # This will be available for all tasks
+    schema={
+        'type': 'object',
+        'properties': {
+            'depth': {
+                'type': 'integer',
+                'default': 5,
+                'minimum': 1,
+                'maximum': 10,
+                'title': 'Depth',
+                'description': ('The number of previous pushes before the current '
+                                'push to attempt to trigger this task on.')
+            }
+        },
+        'additionalProperties': False
+    },
+    available=lambda parameters: parameters.get('project', None) != 'try'
+)
+def backfill_action(parameters, input, task_group_id, task_id, task):
+    label = task['metadata']['name']
+    pushes = []
+    depth = input.get('depth', 5)
+    end_id = int(parameters['pushlog_id']) - 1
+
+    while True:
+        start_id = max(end_id - depth, 0)
+        pushlog_url = PUSHLOG_TMPL.format(parameters['head_repository'], start_id, end_id)
+        r = requests.get(pushlog_url)
+        r.raise_for_status()
+        pushes = pushes + r.json()['pushes'].keys()
+        if len(pushes) >= depth:
+            break
+
+        end_id = start_id - 1
+        start_id -= depth
+        if start_id < 0:
+            break
+
+    pushes = sorted(pushes)[-depth:]
+
+    for push in pushes:
+        full_task_graph = get_artifact_from_index(
+                INDEX_TMPL.format(parameters['project'], push),
+                'public/full-task-graph.json')
+        _, full_task_graph = TaskGraph.from_json(full_task_graph)
+        label_to_taskid = get_artifact_from_index(
+                INDEX_TMPL.format(parameters['project'], push),
+                'public/label-to-taskid.json')
+
+        if label in full_task_graph.tasks.keys():
+            task = full_task_graph.tasks[label]
+            dependencies = {name: label_to_taskid[label]
+                            for name, label in task.dependencies.iteritems()}
+            task_def = resolve_task_references(task.label, task.task, dependencies)
+            task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
+            task_def['schedulerId'] = 'gecko-level-{}'.format(parameters['level'])
+            create_task(slugid(), task_def)
+        else:
+            logging.info('Could not find {} on {}. Skipping.'.format(label, push))
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -30,16 +30,26 @@ def _do_request(url):
     if response.status_code >= 400:
         # Consume content before raise_for_status, so that the connection can be
         # reused.
         response.content
     response.raise_for_status()
     return response
 
 
+def _handle_artifact(path, response):
+    if path.endswith('.json'):
+        return response.json()
+    if path.endswith('.yml'):
+        return yaml.load(response.text)
+    response.raw.read = functools.partial(response.raw.read,
+                                          decode_content=True)
+    return response.raw
+
+
 def get_artifact_url(task_id, path, use_proxy=False):
     if use_proxy:
         ARTIFACT_URL = 'http://taskcluster/queue/v1/task/{}/artifacts/{}'
     else:
         ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
     return ARTIFACT_URL.format(task_id, path)
 
 
@@ -48,23 +58,17 @@ def get_artifact(task_id, path, use_prox
     Returns the artifact with the given path for the given task id.
 
     If the path ends with ".json" or ".yml", the content is deserialized as,
     respectively, json or yaml, and the corresponding python data (usually
     dict) is returned.
     For other types of content, a file-like object is returned.
     """
     response = _do_request(get_artifact_url(task_id, path, use_proxy))
-    if path.endswith('.json'):
-        return response.json()
-    if path.endswith('.yml'):
-        return yaml.load(response.text)
-    response.raw.read = functools.partial(response.raw.read,
-                                          decode_content=True)
-    return response.raw
+    return _handle_artifact(path, response)
 
 
 def list_artifacts(task_id, use_proxy=False):
     response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
     return response.json()['artifacts']
 
 
 def get_index_url(index_path, use_proxy=False):
@@ -75,16 +79,22 @@ def get_index_url(index_path, use_proxy=
     return INDEX_URL.format(index_path)
 
 
 def find_task_id(index_path, use_proxy=False):
     response = _do_request(get_index_url(index_path, use_proxy))
     return response.json()['taskId']
 
 
+def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
+    full_path = index_path + '/artifacts/' + artifact_path
+    response = _do_request(get_index_url(full_path, use_proxy))
+    return _handle_artifact(full_path, response)
+
+
 def get_task_url(task_id, use_proxy=False):
     if use_proxy:
         TASK_URL = 'http://taskcluster/queue/v1/task/{}'
     else:
         TASK_URL = 'https://queue.taskcluster.net/v1/task/{}'
     return TASK_URL.format(task_id)