Bug 1405740 - Add a cancel-all action task draft
authorBrian Stack <bstack@mozilla.com>
Thu, 05 Oct 2017 13:31:56 -0700
changeset 675815 55c0402474dd107dceed2d7e70dbc1358e71100d
parent 674486 65a5054a1f922b83929c80658062f441ca3da6a0
child 734719 51211bd445aa5ddc10cc47fa789b2b2f3505fb02
push id83251
push userbstack@mozilla.com
push dateThu, 05 Oct 2017 22:24:51 +0000
bugs1405740
milestone58.0a1
Bug 1405740 - Add a cancel-all action task MozReview-Commit-ID: 2V8ztB5Pdkc
taskcluster/taskgraph/actions/cancel_all.py
taskcluster/taskgraph/actions/registry.py
taskcluster/taskgraph/util/taskcluster.py
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/actions/cancel_all.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import concurrent.futures as futures
+import logging
+import os
+
+from taskgraph.util.taskcluster import get_session, cancel_task
+from .registry import register_callback_action
+
+# the maximum number of parallel cancelTask calls to make
+CONCURRENCY = 50
+
+base_url = 'https://queue.taskcluster.net/v1/{}'
+
+logger = logging.getLogger(__name__)
+
+
+def list_group(task_group_id, session):
+    params = {}
+    while True:
+        url = base_url.format('task-group/{}/list'.format(task_group_id))
+        response = session.get(url, stream=True, params=params)
+        response = response.json()
+        for task in [t['status'] for t in response['tasks']]:
+            if task['state'] in ['running', 'pending', 'unscheduled']:
+                yield task['taskId']
+        if response.get('continuationToken'):
+            params = {'continuationToken': response.get('continuationToken')}
+        else:
+            break
+
+
+@register_callback_action(
+    title='Cancel All',
+    name='cancel-all',
+    symbol='cAll',
+    description=(
+        'Cancel all running and pending tasks created by the decision task '
+        'this action task is associated with.'
+    ),
+    order=100,
+    context=[]
+)
+def cancel_all_action(parameters, input, task_group_id, task_id, task):
+    session = get_session()
+    own_task_id = os.environ.get('TASK_ID', '')
+    with futures.ThreadPoolExecutor(CONCURRENCY) as e:
+        cancels_jobs = [
+            e.submit(cancel_task, t, use_proxy=True)
+            for t in list_group(task_group_id, session) if t != own_task_id
+        ]
+        for job in cancels_jobs:
+            job.result()
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -11,16 +11,17 @@ import os
 import inspect
 import re
 import yaml
 from slugid import nice as slugid
 from mozbuild.util import memoize
 from types import FunctionType
 from collections import namedtuple
 from taskgraph import create
+from taskgraph.util import taskcluster
 from taskgraph.util.docker import docker_image
 from taskgraph.parameters import Parameters
 
 
 GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
 
 actions = []
 callbacks = {}
@@ -235,28 +236,29 @@ def register_callback_action(name, title
                     'artifacts': {
                         'public': {
                             'type': 'directory',
                             'path': '/builds/worker/artifacts',
                             'expires': {'$fromNow': '1 year'},
                         },
                     },
                     'cache': {
-                        'level-{}-checkouts'.format(parameters['level']):
+                        'level-{}-checkouts-sparse-v1'.format(parameters['level']):
                             '/builds/worker/checkouts',
                     },
                     'features': {
                         'taskclusterProxy': True,
                         'chainOfTrust': True,
                     },
                     'image': docker_image('decision'),
                     'maxRunTime': 1800,
                     'command': [
                         '/builds/worker/bin/run-task',
                         '--vcs-checkout=/builds/worker/checkouts/gecko',
+                        '--sparse-profile=build/sparse-profiles/taskgraph',
                         '--', 'bash', '-cx',
                         """\
 cd /builds/worker/checkouts/gecko &&
 ln -s /builds/worker/artifacts artifacts &&
 ./mach --log-no-times taskgraph action-callback""",
                     ],
                 },
                 'extra': {
@@ -331,16 +333,17 @@ def trigger_action_callback(task_group_i
     """
     cb = get_callbacks().get(callback, None)
     if not cb:
         raise Exception('Unknown callback: {}. Known callbacks: {}'.format(
             callback, get_callbacks().keys()))
 
     if test:
         create.testing = True
+        taskcluster.testing = True
 
     cb(Parameters(**parameters), input, task_group_id, task_id, task)
 
 
 @memoize
 def _load():
     # Load all modules from this folder, relying on the side-effects of register_
     # functions to populate the action registry.
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -5,23 +5,29 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import datetime
 import functools
 import yaml
 import requests
+import logging
 from mozbuild.util import memoize
 from requests.packages.urllib3.util.retry import Retry
 from requests.adapters import HTTPAdapter
 
 _TC_ARTIFACT_LOCATION = \
         'https://queue.taskcluster.net/v1/task/{task_id}/artifacts/public/build/{postfix}'
 
+logger = logging.getLogger(__name__)
+
+# this is set to true for `mach taskgraph action-callback --test`
+testing = False
+
 
 @memoize
 def get_session():
     session = requests.Session()
     retry = Retry(total=5, backoff_factor=0.1,
                   status_forcelist=[500, 502, 503, 504])
     session.mount('http://', HTTPAdapter(max_retries=retry))
     session.mount('https://', HTTPAdapter(max_retries=retry))
@@ -134,13 +140,22 @@ def get_task_url(task_id, use_proxy=Fals
     return TASK_URL.format(task_id)
 
 
 def get_task_definition(task_id, use_proxy=False):
     response = _do_request(get_task_url(task_id, use_proxy))
     return response.json()
 
 
+def cancel_task(task_id, use_proxy=False):
+    """Cancels a task given a task_id. In testing mode, just logs that it would
+    have cancelled."""
+    if testing:
+        logger.info('Would have cancelled {}.'.format(task_id))
+    else:
+        _do_request(get_task_url(task_id, use_proxy) + '/cancel', content={})
+
+
 def get_taskcluster_artifact_prefix(task_id, postfix='', locale=None):
     if locale:
         postfix = '{}/{}'.format(locale, postfix)
 
     return _TC_ARTIFACT_LOCATION.format(task_id=task_id, postfix=postfix)