Bug 1405740 - Add a cancel-all action task
MozReview-Commit-ID: 2V8ztB5Pdkc
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/actions/cancel_all.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import concurrent.futures as futures
+import logging
+import os
+
+from taskgraph.util.taskcluster import get_session, cancel_task
+from .registry import register_callback_action
+
+# the maximum number of parallel cancelTask calls to make
+CONCURRENCY = 50
+
+base_url = 'https://queue.taskcluster.net/v1/{}'
+
+logger = logging.getLogger(__name__)
+
+
+def list_group(task_group_id, session):
+ params = {}
+ while True:
+ url = base_url.format('task-group/{}/list'.format(task_group_id))
+ response = session.get(url, stream=True, params=params)
+ response = response.json()
+ for task in [t['status'] for t in response['tasks']]:
+ if task['state'] in ['running', 'pending', 'unscheduled']:
+ yield task['taskId']
+ if response.get('continuationToken'):
+ params = {'continuationToken': response.get('continuationToken')}
+ else:
+ break
+
+
+@register_callback_action(
+ title='Cancel All',
+ name='cancel-all',
+ symbol='cAll',
+ description=(
+ 'Cancel all running and pending tasks created by the decision task '
+ 'this action task is associated with.'
+ ),
+ order=100,
+ context=[]
+)
+def cancel_all_action(parameters, input, task_group_id, task_id, task):
+ session = get_session()
+ own_task_id = os.environ.get('TASK_ID', '')
+ with futures.ThreadPoolExecutor(CONCURRENCY) as e:
+ cancels_jobs = [
+ e.submit(cancel_task, t, use_proxy=True)
+ for t in list_group(task_group_id, session) if t != own_task_id
+ ]
+ for job in cancels_jobs:
+ job.result()
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -11,16 +11,17 @@ import os
import inspect
import re
import yaml
from slugid import nice as slugid
from mozbuild.util import memoize
from types import FunctionType
from collections import namedtuple
from taskgraph import create
+from taskgraph.util import taskcluster
from taskgraph.util.docker import docker_image
from taskgraph.parameters import Parameters
GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
actions = []
callbacks = {}
@@ -235,28 +236,29 @@ def register_callback_action(name, title
'artifacts': {
'public': {
'type': 'directory',
'path': '/builds/worker/artifacts',
'expires': {'$fromNow': '1 year'},
},
},
'cache': {
- 'level-{}-checkouts'.format(parameters['level']):
+ 'level-{}-checkouts-sparse-v1'.format(parameters['level']):
'/builds/worker/checkouts',
},
'features': {
'taskclusterProxy': True,
'chainOfTrust': True,
},
'image': docker_image('decision'),
'maxRunTime': 1800,
'command': [
'/builds/worker/bin/run-task',
'--vcs-checkout=/builds/worker/checkouts/gecko',
+ '--sparse-profile=build/sparse-profiles/taskgraph',
'--', 'bash', '-cx',
"""\
cd /builds/worker/checkouts/gecko &&
ln -s /builds/worker/artifacts artifacts &&
./mach --log-no-times taskgraph action-callback""",
],
},
'extra': {
@@ -331,16 +333,17 @@ def trigger_action_callback(task_group_i
"""
cb = get_callbacks().get(callback, None)
if not cb:
raise Exception('Unknown callback: {}. Known callbacks: {}'.format(
callback, get_callbacks().keys()))
if test:
create.testing = True
+ taskcluster.testing = True
cb(Parameters(**parameters), input, task_group_id, task_id, task)
@memoize
def _load():
# Load all modules from this folder, relying on the side-effects of register_
# functions to populate the action registry.
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -5,23 +5,29 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import datetime
import functools
import yaml
import requests
+import logging
from mozbuild.util import memoize
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
_TC_ARTIFACT_LOCATION = \
'https://queue.taskcluster.net/v1/task/{task_id}/artifacts/public/build/{postfix}'
+logger = logging.getLogger(__name__)
+
+# this is set to true for `mach taskgraph action-callback --test`
+testing = False
+
@memoize
def get_session():
session = requests.Session()
retry = Retry(total=5, backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retry))
session.mount('https://', HTTPAdapter(max_retries=retry))
@@ -134,13 +140,22 @@ def get_task_url(task_id, use_proxy=Fals
return TASK_URL.format(task_id)
def get_task_definition(task_id, use_proxy=False):
response = _do_request(get_task_url(task_id, use_proxy))
return response.json()
+def cancel_task(task_id, use_proxy=False):
+ """Cancels a task given a task_id. In testing mode, just logs that it would
+ have cancelled."""
+ if testing:
+ logger.info('Would have cancelled {}.'.format(task_id))
+ else:
+ _do_request(get_task_url(task_id, use_proxy) + '/cancel', content={})
+
+
def get_taskcluster_artifact_prefix(task_id, postfix='', locale=None):
if locale:
postfix = '{}/{}'.format(locale, postfix)
return _TC_ARTIFACT_LOCATION.format(task_id=task_id, postfix=postfix)