Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks draft
authorBrian Stack <bstack@mozilla.com>
Wed, 20 Sep 2017 12:52:29 -0700
changeset 671361 c2f7e27723d5b0382e449a18160a29079505df12
parent 671268 756e10aa8bbd416cbc49b7739f78fb81d5525477
child 733510 9faa9c4005fbfc03dd9791c0b4667402153f7921
push id81930
push userbstack@mozilla.com
push dateWed, 27 Sep 2017 21:39:41 +0000
bugs1400223
milestone58.0a1
Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks MozReview-Commit-ID: 7ZTbS5h0vPA
taskcluster/docs/actions.rst
taskcluster/taskgraph/actions/add_new_jobs.py
taskcluster/taskgraph/actions/add_talos.py
taskcluster/taskgraph/actions/mochitest_retrigger.py
taskcluster/taskgraph/actions/registry.py
taskcluster/taskgraph/actions/retrigger.py
taskcluster/taskgraph/actions/run_missing_tests.py
taskcluster/taskgraph/actions/util.py
taskcluster/taskgraph/util/taskcluster.py
--- a/taskcluster/docs/actions.rst
+++ b/taskcluster/docs/actions.rst
@@ -35,17 +35,17 @@ a custom action task can be more efficie
 Creating a Callback Action
 --------------------------
 A *callback action* is an action that calls back into in-tree logic. That is,
 you register the action with name, title, description, context, input schema and a
 python callback. When the action is triggered in a user interface,
 input matching the schema is collected, passed to a new task which then calls
 your python callback, enabling it to do pretty much anything it wants to.
 
-To create a new action you must create a file
+To create a new callback action you must create a file
 ``taskcluster/taskgraph/actions/my-action.py``, that at minimum contains::
 
   from registry import register_callback_action
 
   @register_callback_action(
       name='hello',
       title='Say Hello',
       symbol='hw',  # Show the callback task in treeherder as 'hw'
@@ -53,16 +53,30 @@ To create a new action you must create a
       order=10000,  # Order in which it should appear relative to other actions
   )
   def hello_world_action(parameters, input, task_group_id, task_id, task):
       # parameters is an instance of taskgraph.parameters.Parameters
       # it carries decision task parameters from the original decision task.
       # input, task_id, and task should all be None
       print "Hello was triggered from taskGroupId: " + taskGroupId
 
+Callback actions are configured in-tree to generate 3 artifacts when they run.
+These artifacts are similar to the artifacts generated by decision tasks since
+callback actions are basically mini decision tasks. The artifacts are:
+
+``task-graph.json``:
+  The graph of all tasks created by the action task. Includes tasks
+  created to satisfy requirements.
+``to-run.json``:
+  The set of tasks that the action task requested to build. This does not
+  include the requirements.
+``label-to-taskid.json``:
+  This is the mapping from label to ``taskid`` for all tasks involved in
+  the task-graph. This includes dependencies.
+
 The example above defines an action that is available in the context-menu for
 the entire task-group (result-set or push in Treeherder terminology). To create
 an action that shows up in the context menu for a task we would specify the
 ``context`` parameter.
 
 
 Setting the Action Context
 ..........................
--- a/taskcluster/taskgraph/actions/add_new_jobs.py
+++ b/taskcluster/taskgraph/actions/add_new_jobs.py
@@ -3,19 +3,17 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 from .registry import register_callback_action
 
-from .util import (create_tasks, find_decision_task)
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import (create_tasks, fetch_graph_and_labels)
 
 
 @register_callback_action(
     name='add-new-jobs',
     title='Add new jobs',
     symbol='add-new',
     description="Add new jobs using task labels.",
     order=10000,
@@ -29,21 +27,17 @@ from taskgraph.taskgraph import TaskGrap
                 'items': {
                     'type': 'string'
                 }
             }
         }
     }
 )
 def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     to_run = []
     for elem in input['tasks']:
         if elem in full_task_graph.tasks:
             to_run.append(elem)
         else:
             raise Exception('{} was not found in the task-graph'.format(elem))
 
--- a/taskcluster/taskgraph/actions/add_talos.py
+++ b/taskcluster/taskgraph/actions/add_talos.py
@@ -4,19 +4,17 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 from .registry import register_callback_action
-from .util import create_tasks, find_decision_task
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import create_tasks, fetch_graph_and_labels
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     name='run-all-talos',
     title='Run All Talos Tests',
     symbol='raT',
@@ -34,21 +32,17 @@ logger = logging.getLogger(__name__)
                 'title': 'Times',
                 'description': 'How many times to run each task.',
             }
         },
         'additionalProperties': False
     },
 )
 def add_all_talos(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     times = input.get('times', 1)
     for i in xrange(times):
         to_run = [label
                   for label, entry
                   in full_task_graph.tasks.iteritems() if 'talos_try_name' in entry.attributes]
 
         create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
--- a/taskcluster/taskgraph/actions/mochitest_retrigger.py
+++ b/taskcluster/taskgraph/actions/mochitest_retrigger.py
@@ -6,21 +6,19 @@
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import logging
 
 from slugid import nice as slugid
 
-from .util import (find_decision_task, create_task_from_def)
+from .util import (create_task_from_def, fetch_graph_and_labels)
 from .registry import register_callback_action
-from taskgraph.util.taskcluster import get_artifact
 from taskgraph.util.parameterization import resolve_task_references
-from taskgraph.taskgraph import TaskGraph
 
 TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     name='retrigger-mochitest-reftest-with-options',
@@ -77,21 +75,17 @@ logger = logging.getLogger(__name__)
                 'additionalProperties': {'type': 'string'}
             }
         },
         'additionalProperties': False,
         'required': ['path']
     }
 )
 def mochitest_retrigger_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     pre_task = full_task_graph.tasks[task['metadata']['name']]
 
     # fix up the task's dependencies, similar to how optimization would
     # have done in the decision
     dependencies = {name: label_to_taskid[label]
                     for name, label in pre_task.dependencies.iteritems()}
     new_task_definition = resolve_task_references(pre_task.label, pre_task.task, dependencies)
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -185,17 +185,17 @@ def register_callback_action(name, title
             repo_scope = 'assume:repo:{}/{}:*'.format(
                 match.group(1), match.group(2))
 
             task_group_id = os.environ.get('TASK_ID', slugid())
 
             return {
                 'created': {'$fromNow': ''},
                 'deadline': {'$fromNow': '12 hours'},
-                'expires': {'$fromNow': '14 days'},
+                'expires': {'$fromNow': '1 year'},
                 'metadata': {
                     'owner': 'mozilla-taskcluster-maintenance@mozilla.com',
                     'source': '{}raw-file/{}/{}'.format(
                         parameters['head_repository'], parameters['head_rev'], source_path,
                     ),
                     'name': 'Action: {}'.format(title),
                     'description': 'Task executing callback for action.\n\n---\n' + description,
                 },
@@ -210,31 +210,40 @@ def register_callback_action(name, title
                     'createdForUser': parameters['owner'],
                     'kind': 'action-callback',
                 },
                 'routes': [
                     'tc-treeherder.v2.{}.{}.{}'.format(
                         parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
                     'tc-treeherder-stage.v2.{}.{}.{}'.format(
                         parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
+                    'index.gecko.v2.{}.pushlog-id.{}.actions.${{ownTaskId}}'.format(
+                        parameters['project'], parameters['pushlog_id'])
                 ],
                 'payload': {
                     'env': {
                         'GECKO_BASE_REPOSITORY': 'https://hg.mozilla.org/mozilla-unified',
                         'GECKO_HEAD_REPOSITORY': parameters['head_repository'],
                         'GECKO_HEAD_REF': parameters['head_ref'],
                         'GECKO_HEAD_REV': parameters['head_rev'],
                         'HG_STORE_PATH': '/builds/worker/checkouts/hg-store',
                         'ACTION_TASK_GROUP_ID': task_group_id,
                         'ACTION_TASK_ID': {'$json': {'$eval': 'taskId'}},
                         'ACTION_TASK': {'$json': {'$eval': 'task'}},
                         'ACTION_INPUT': {'$json': {'$eval': 'input'}},
                         'ACTION_CALLBACK': cb.__name__,
                         'ACTION_PARAMETERS': {'$json': {'$eval': 'parameters'}},
                     },
+                    'artifacts': {
+                        'public': {
+                            'type': 'directory',
+                            'path': '/builds/worker/artifacts',
+                            'expires': {'$fromNow': '1 year'},
+                        },
+                    },
                     'cache': {
                         'level-{}-checkouts'.format(parameters['level']):
                             '/builds/worker/checkouts',
                     },
                     'features': {
                         'taskclusterProxy': True,
                         'chainOfTrust': True,
                     },
--- a/taskcluster/taskgraph/actions/retrigger.py
+++ b/taskcluster/taskgraph/actions/retrigger.py
@@ -5,21 +5,19 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 from .util import (
     create_tasks,
-    find_decision_task
+    fetch_graph_and_labels
 )
 from .registry import register_callback_action
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     title='Retrigger',
     name='retrigger',
     symbol='rt',
@@ -46,21 +44,17 @@ logger = logging.getLogger(__name__)
                 'maximum': 6,
                 'title': 'Times',
                 'description': 'How many times to run each task.',
             }
         }
     }
 )
 def retrigger_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     label = task['metadata']['name']
     with_downstream = ' '
     to_run = [label]
 
     if input.get('downstream'):
         to_run = full_task_graph.graph.transitive_closure(set(to_run), reverse=True).nodes
         to_run = to_run & set(label_to_taskid.keys())
--- a/taskcluster/taskgraph/actions/run_missing_tests.py
+++ b/taskcluster/taskgraph/actions/run_missing_tests.py
@@ -4,19 +4,18 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 from .registry import register_callback_action
-from .util import create_tasks, find_decision_task
+from .util import create_tasks, fetch_graph_and_labels
 from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     name='run-missing-tests',
     title='Run Missing Tests',
     symbol='rmt',
@@ -25,22 +24,18 @@ logger = logging.getLogger(__name__)
         "\n"
         "This action is for use on pushes that will be merged into another branch,"
         "to check that optimization hasn't hidden any failures."
     ),
     order=100,  # Useful for sheriffs, but not top of the list
     context=[],  # Applies to decision task
 )
 def run_missing_tests(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
     target_tasks = get_artifact(decision_task_id, "public/target-tasks.json")
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
 
     # The idea here is to schedule all tasks of the `test` kind that were
     # targetted but did not appear in the final task-graph -- those were the
     # optimized tasks.
     to_run = []
     already_run = 0
     for label in target_tasks:
         task = full_task_graph.tasks[label]
--- a/taskcluster/taskgraph/actions/util.py
+++ b/taskcluster/taskgraph/actions/util.py
@@ -1,30 +1,61 @@
 # -*- coding: utf-8 -*-
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
+import logging
+
+from requests.exceptions import HTTPError
+
 from taskgraph import create
+from taskgraph.decision import write_artifact
 from taskgraph.taskgraph import TaskGraph
 from taskgraph.optimize import optimize_task_graph
-from taskgraph.util.taskcluster import get_session, find_task_id
+from taskgraph.util.taskcluster import get_session, find_task_id, get_artifact, list_tasks
+
+logger = logging.getLogger(__name__)
 
 
 def find_decision_task(parameters):
     """Given the parameters for this action, find the taskId of the decision
     task"""
     return find_task_id('gecko.v2.{}.pushlog-id.{}.decision'.format(
         parameters['project'],
         parameters['pushlog_id']))
 
 
+def fetch_graph_and_labels(parameters):
+    decision_task_id = find_decision_task(parameters)
+
+    # First grab the graph and labels generated during the initial decision task
+    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
+    _, full_task_graph = TaskGraph.from_json(full_task_graph)
+    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+    # Now fetch any modifications made by action tasks and swap out new tasks
+    # for old ones
+    namespace = 'gecko.v2.{}.pushlog-id.{}.actions'.format(
+        parameters['project'],
+        parameters['pushlog_id'])
+    for action in list_tasks(namespace):
+        try:
+            run_label_to_id = get_artifact(action, "public/label-to-taskid.json")
+            label_to_taskid.update(run_label_to_id)
+        except HTTPError as e:
+            logger.info('Skipping {} due to missing artifact! Error: {}'.format(action, e))
+            continue
+
+    return (decision_task_id, full_task_graph, label_to_taskid)
+
+
 def create_task_from_def(task_id, task_def, level):
     """Create a new task from a definition rather than from a label
     that is already in the full-task-graph. The task definition will
     have {relative-datestamp': '..'} rendered just like in a decision task.
     Use this for entirely new tasks or ones that change internals of the task.
     It is useful if you want to "edit" the full_task_graph and then hand
     it to this function. No dependencies will be scheduled. You must handle
     this yourself. Seeing how create_tasks handles it might prove helpful."""
@@ -44,9 +75,12 @@ def create_tasks(to_run, full_task_graph
     target_graph = full_task_graph.graph.transitive_closure(to_run)
     target_task_graph = TaskGraph(
         {l: full_task_graph[l] for l in target_graph.nodes},
         target_graph)
     optimized_task_graph, label_to_taskid = optimize_task_graph(target_task_graph,
                                                                 params,
                                                                 to_run,
                                                                 label_to_taskid)
+    write_artifact('task-graph.json', optimized_task_graph.to_json())
+    write_artifact('label-to-taskid.json', label_to_taskid)
+    write_artifact('to-run.json', list(to_run))
     create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -1,16 +1,17 @@
 # -*- coding: utf-8 -*-
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
+import datetime
 import functools
 import yaml
 import requests
 from mozbuild.util import memoize
 from requests.packages.urllib3.util.retry import Retry
 from requests.adapters import HTTPAdapter
 
 _TC_ARTIFACT_LOCATION = \
@@ -22,19 +23,22 @@ def get_session():
     session = requests.Session()
     retry = Retry(total=5, backoff_factor=0.1,
                   status_forcelist=[500, 502, 503, 504])
     session.mount('http://', HTTPAdapter(max_retries=retry))
     session.mount('https://', HTTPAdapter(max_retries=retry))
     return session
 
 
-def _do_request(url):
+def _do_request(url, content=None):
     session = get_session()
-    response = session.get(url, stream=True)
+    if content is None:
+        response = session.get(url, stream=True)
+    else:
+        response = session.post(url, json=content)
     if response.status_code >= 400:
         # Consume content before raise_for_status, so that the connection can be
         # reused.
         response.content
     response.raise_for_status()
     return response
 
 
@@ -69,22 +73,22 @@ def get_artifact(task_id, path, use_prox
     return _handle_artifact(path, response)
 
 
 def list_artifacts(task_id, use_proxy=False):
     response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
     return response.json()['artifacts']
 
 
-def get_index_url(index_path, use_proxy=False):
+def get_index_url(index_path, use_proxy=False, multiple=False):
     if use_proxy:
-        INDEX_URL = 'http://taskcluster/index/v1/task/{}'
+        INDEX_URL = 'http://taskcluster/index/v1/task{}/{}'
     else:
-        INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
-    return INDEX_URL.format(index_path)
+        INDEX_URL = 'https://index.taskcluster.net/v1/task{}/{}'
+    return INDEX_URL.format('s' if multiple else '', index_path)
 
 
 def find_task_id(index_path, use_proxy=False):
     try:
         response = _do_request(get_index_url(index_path, use_proxy))
     except requests.exceptions.HTTPError as e:
         if e.response.status_code == 404:
             raise KeyError("index path {} not found".format(index_path))
@@ -93,16 +97,40 @@ def find_task_id(index_path, use_proxy=F
 
 
 def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
     full_path = index_path + '/artifacts/' + artifact_path
     response = _do_request(get_index_url(full_path, use_proxy))
     return _handle_artifact(full_path, response)
 
 
+def list_tasks(index_path, use_proxy=False):
+    """
+    Returns a list of task_ids where each task_id is indexed under a path
+    in the index. Results are sorted by expiration date from oldest to newest.
+    """
+    results = []
+    data = {}
+    while True:
+        response = _do_request(get_index_url(index_path, use_proxy, multiple=True), data)
+        response = response.json()
+        results += response['tasks']
+        if response.get('continuationToken'):
+            data = {'continuationToken': response.get('continuationToken')}
+        else:
+            break
+
+    # We can sort on expires because in the general case
+    # all of these tasks should be created with the same expires time so they end up in
+    # order from earliest to latest action. If more correctness is needed, consider
+    # fetching each task and sorting on the created date.
+    results.sort(key=lambda t: datetime.datetime.strptime(t['expires'], '%Y-%m-%dT%H:%M:%S.%fZ'))
+    return [t['taskId'] for t in results]
+
+
 def get_task_url(task_id, use_proxy=False):
     if use_proxy:
         TASK_URL = 'http://taskcluster/queue/v1/task/{}'
     else:
         TASK_URL = 'https://queue.taskcluster.net/v1/task/{}'
     return TASK_URL.format(task_id)