Bug 1391417 - Make retrigger act like it does in mozilla-taskcluster
MozReview-Commit-ID: 1UAjWqQvwy5
--- a/taskcluster/taskgraph/actions/retrigger.py
+++ b/taskcluster/taskgraph/actions/retrigger.py
@@ -1,68 +1,73 @@
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
-from .registry import register_task_action
+import logging
+
+from .util import (
+ create_tasks,
+ find_decision_task
+)
+from .registry import register_callback_action
+from taskgraph.util.taskcluster import get_artifact
+from taskgraph.taskgraph import TaskGraph
+
+logger = logging.getLogger(__name__)
-@register_task_action(
+@register_callback_action(
title='Retrigger',
name='retrigger',
+ symbol='rt',
description=(
'Create a clone of the task.\n\n'
- 'This does not update any dependencies or '
- 'cause any downstream tasks to be retriggered.'
),
order=1,
context=[{}],
+ schema={
+ 'type': 'object',
+ 'properties': {
+ 'downstream': {
+ 'type': 'boolean',
+ 'description': (
+ 'If true, downstream tasks from this one will be cloned as well. '
+ 'The dependencies will be updated to work with the new task at the root.'
+ ),
+ 'default': False,
+ },
+ 'times': {
+ 'type': 'integer',
+ 'default': 1,
+ 'minimum': 1,
+ 'maximum': 6,
+ 'title': 'Times',
+ 'description': 'How many times to run each task.',
+ }
+ }
+ }
)
-def retrigger_task_builder(parameters):
-
- new_expires = '30 days'
+def retrigger_action(parameters, input, task_group_id, task_id, task):
+ decision_task_id = find_decision_task(parameters)
- return {
- '$merge': [
- {'$eval': 'task'},
- {'created': {'$fromNow': ''}},
- {'deadline': {'$fromNow': '1 day'}},
- {'expires': {'$fromNow': new_expires}},
- {'payload': {
- '$merge': [
- {'$eval': 'task.payload'},
- {
- '$if': '"artifacts" in task.payload',
- 'then': {
- 'artifacts': {
- '$if': 'typeof(task.payload.artifacts) == "object"',
- 'then': {
- '$map': {'$eval': 'task.payload.artifacts'},
- 'each(artifact)': {
- '${artifact.key}': {
- '$merge': [
- {'$eval': 'artifact.val'},
- {'expires': {'$fromNow': new_expires}},
- ],
- },
- },
- },
- 'else': {
- '$map': {'$eval': 'task.payload.artifacts'},
- 'each(artifact)': {
- '$merge': [
- {'$eval': 'artifact'},
- {'expires': {'$fromNow': new_expires}},
- ],
- },
- },
- },
- },
- 'else': {},
- }
- ]
- }}
- ]
- }
+ full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
+ _, full_task_graph = TaskGraph.from_json(full_task_graph)
+ label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+ label = task['metadata']['name']
+ with_downstream = ' '
+ to_run = [label]
+
+ if input.get('downstream'):
+ to_run = full_task_graph.graph.transitive_closure(set(to_run), reverse=True).nodes
+ to_run = to_run & set(label_to_taskid.keys())
+ with_downstream = ' (with downstream) '
+
+ times = input.get('times', 1)
+ for i in xrange(times):
+ create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
+
+ logger.info('Scheduled {}{}(time {}/{})'.format(label, with_downstream, i+1, times))
--- a/taskcluster/taskgraph/graph.py
+++ b/taskcluster/taskgraph/graph.py
@@ -35,35 +35,50 @@ class Graph(object):
self.edges = edges
def __eq__(self, other):
return self.nodes == other.nodes and self.edges == other.edges
def __repr__(self):
return "<Graph nodes={!r} edges={!r}>".format(self.nodes, self.edges)
- def transitive_closure(self, nodes):
+ def transitive_closure(self, nodes, reverse=False):
"""
Return the transitive closure of <nodes>: the graph containing all
specified nodes as well as any nodes reachable from them, and any
intervening edges.
+
+ If `reverse` is true, the "reachability" will be reversed and this
+ will return the set of nodes that can reach the specified nodes.
+
+ Example
+ -------
+
+ a ------> b ------> c
+ |
+ `-------> d
+
+ transitive_closure([b]).nodes == set([a, b])
+ transitive_closure([c]).nodes == set([c, b, a])
+ transitive_closure([c], reverse=True).nodes == set([c])
+ transitive_closure([b], reverse=True).nodes == set([b, c, d])
"""
assert isinstance(nodes, set)
assert nodes <= self.nodes
# generate a new graph by expanding along edges until reaching a fixed
# point
new_nodes, new_edges = nodes, set()
nodes, edges = set(), set()
while (new_nodes, new_edges) != (nodes, edges):
nodes, edges = new_nodes, new_edges
add_edges = set((left, right, name)
for (left, right, name) in self.edges
- if left in nodes)
- add_nodes = set(right for (_, right, _) in add_edges)
+ if (right if reverse else left) in nodes)
+ add_nodes = set((left if reverse else right) for (left, right, _) in add_edges)
new_nodes = nodes | add_nodes
new_edges = edges | add_edges
return Graph(new_nodes, new_edges)
def visit_postorder(self):
"""
Generate a sequence of nodes in postorder, such that every node is
visited *after* any nodes it links to.