Bug 1289823 - Add backfilling as an action-task draft
authorBrian Stack <bstack@mozilla.com>
Wed, 07 Dec 2016 13:33:20 -1000
changeset 448835 106da6ced6ed195624d8f5b6feca51b11162af6e
parent 444822 155343f671b514d4714b4c5e15cf35abc4a3a72f
child 539384 7f9711c6155bfbd2853884f43d285b4a348d371d
push id38444
push userbstack@mozilla.com
push dateTue, 13 Dec 2016 02:37:55 +0000
bugs1289823
milestone53.0a1
Bug 1289823 - Add backfilling as an action-task MozReview-Commit-ID: HALwE6Q0Lch
taskcluster/mach_commands.py
taskcluster/taskgraph/action.py
taskcluster/taskgraph/action.yml
taskcluster/taskgraph/decision.py
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -166,17 +166,17 @@ class MachCommands(MachCommandBase):
         try:
             self.setup_logging()
             return taskgraph.decision.taskgraph_decision(options)
         except Exception:
             traceback.print_exc()
             sys.exit(1)
 
     @SubCommand('taskgraph', 'action-task',
-                description="Run the action task")
+                description="Run the add-tasks task. DEPRECATED! Use 'add-tasks' instead.")
     @CommandArgument('--root', '-r',
                      default='taskcluster/ci',
                      help="root of the taskgraph definition relative to topsrcdir")
     @CommandArgument('--decision-id',
                      required=True,
                      help="Decision Task ID of the reference decision task")
     @CommandArgument('--task-labels',
                      required=True,
@@ -184,17 +184,68 @@ class MachCommands(MachCommandBase):
     def taskgraph_action(self, **options):
         """Run the action task: Generates a task graph using the set of labels
         provided in the task-labels parameter. It uses the full-task file of
         the gecko decision task."""
 
         import taskgraph.action
         try:
             self.setup_logging()
-            return taskgraph.action.taskgraph_action(options)
+            return taskgraph.action.add_tasks(options['decision_id'],
+                                              options['task_labels'].split(','))
+        except Exception:
+            traceback.print_exc()
+            sys.exit(1)
+
+    @SubCommand('taskgraph', 'add-tasks',
+                description="Run the add-tasks task")
+    @CommandArgument('--root', '-r',
+                     default='taskcluster/ci',
+                     help="root of the taskgraph definition relative to topsrcdir")
+    @CommandArgument('--decision-id',
+                     required=True,
+                     help="Decision Task ID of the reference decision task")
+    @CommandArgument('--task-labels',
+                     required=True,
+                     help='Comma separated list of task labels to be scheduled')
+    def taskgraph_add_tasks(self, **options):
+        """Run the action task: Generates a task graph using the set of labels
+        provided in the task-labels parameter. It uses the full-task file of
+        the gecko decision task."""
+
+        import taskgraph.action
+        try:
+            self.setup_logging()
+            return taskgraph.action.add_tasks(options['decision_id'],
+                                              options['task_labels'].split(','))
+        except Exception:
+            traceback.print_exc()
+            sys.exit(1)
+
+    @SubCommand('taskgraph', 'backfill',
+                description="Run the backfill task")
+    @CommandArgument('--root', '-r',
+                     default='taskcluster/ci',
+                     help="root of the taskgraph definition relative to topsrcdir")
+    @CommandArgument('--project',
+                     required=True,
+                     help="Project of the jobs that need to be backfilled.")
+    @CommandArgument('--job-id',
+                     required=True,
+                     help="Id of the job to be backfilled.")
+    def taskgraph_backfill(self, **options):
+        """Run the backfill task: Given a job in a project, it will
+        add that job type to any previous revisions in treeherder
+        until either a hard limit is met or a green version of that
+        job is found."""
+
+        import taskgraph.action
+        try:
+            self.setup_logging()
+            return taskgraph.action.backfill(options['project'], options['job_id'])
         except Exception:
             traceback.print_exc()
             sys.exit(1)
 
     def setup_logging(self, quiet=False, verbose=True):
         """
         Set up Python logging for all loggers, sending results to stderr (so
         that command output can be redirected easily) and adding the typical
--- a/taskcluster/taskgraph/action.py
+++ b/taskcluster/taskgraph/action.py
@@ -12,57 +12,129 @@ import requests
 import yaml
 
 from .create import create_tasks
 from .decision import write_artifact
 from .optimize import optimize_task_graph
 from .taskgraph import TaskGraph
 
 logger = logging.getLogger(__name__)
-TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task/"
+TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
+TREEHERDER_URL = "https://treeherder.mozilla.org/api"
+
+# We set this to 5 for now because this is what SETA sets the
+# count to for every repository/job. If this is ever changed,
+# we'll need to have an API added to Treeherder to let us query
+# how far back we should look.
+MAX_BACKFILL_RESULTSETS = 5
 
 
-def taskgraph_action(options):
+def add_tasks(decision_task_id, task_labels, prefix=''):
     """
-    Run the action task.  This function implements `mach taskgraph action-task`,
+    Run the add-tasks task.  This function implements `mach taskgraph add-tasks`,
     and is responsible for
 
      * creating taskgraph of tasks asked for in parameters with respect to
      a given gecko decision task and schedule these jobs.
     """
-
-    decision_task_id = options['decision_id']
     # read in the full graph for reference
     full_task_json = get_artifact(decision_task_id, "public/full-task-graph.json")
     decision_params = get_artifact(decision_task_id, "public/parameters.yml")
     all_tasks, full_task_graph = TaskGraph.from_json(full_task_json)
 
-    target_tasks = set(options['task_labels'].split(','))
+    target_tasks = set(task_labels)
     target_graph = full_task_graph.graph.transitive_closure(target_tasks)
     target_task_graph = TaskGraph(
         {l: all_tasks[l] for l in target_graph.nodes},
         target_graph)
 
     existing_tasks = get_artifact(decision_task_id, "public/label-to-taskid.json")
 
     # We don't want to optimize target tasks since they have been requested by user
     # Hence we put `target_tasks under` `do_not_optimize`
     optimized_graph, label_to_taskid = optimize_task_graph(target_task_graph=target_task_graph,
                                                            params=decision_params,
                                                            do_not_optimize=target_tasks,
                                                            existing_tasks=existing_tasks)
 
     # write out the optimized task graph to describe what will actually happen,
     # and the map of labels to taskids
-    write_artifact('task-graph.json', optimized_graph.to_json())
-    write_artifact('label-to-taskid.json', label_to_taskid)
+    write_artifact('{}task-graph.json'.format(prefix), optimized_graph.to_json())
+    write_artifact('{}label-to-taskid.json'.format(prefix), label_to_taskid)
     # actually create the graph
     create_tasks(optimized_graph, label_to_taskid, decision_params)
 
 
 def get_artifact(task_id, path):
-    url = TASKCLUSTER_QUEUE_URL + task_id + "/artifacts/" + path
-    resp = requests.get(url=url)
+    resp = requests.get(url="{}/{}/artifacts/{}".format(TASKCLUSTER_QUEUE_URL, task_id, path))
     if path.endswith('.json'):
         artifact = json.loads(resp.text)
     elif path.endswith('.yml'):
         artifact = yaml.load(resp.text)
     return artifact
+
+
+def backfill(project, job_id):
+    """
+    Run the backfill task.  This function implements `mach taskgraph backfill-task`,
+    and is responsible for
+
+     * Scheduling backfill jobs from a given treeherder resultset backwards until either
+     a successful job is found or `N` jobs have been scheduled.
+    """
+    s = requests.Session()
+    s.headers.update({"User-Agent": "gecko-intree-backfill-task"})
+
+    job = s.get(url="{}/project/{}/jobs/{}/".format(TREEHERDER_URL, project, job_id)).json()
+
+    if job["build_system_type"] != "taskcluster":
+        logger.warning("Invalid build system type! Must be a Taskcluster job. Aborting.")
+        return
+
+    filters = dict((k, job[k]) for k in ("build_platform_id", "platform_option", "job_type_id"))
+
+    resultset_url = "{}/project/{}/resultset/".format(TREEHERDER_URL, project)
+    params = {"id__lt": job["result_set_id"], "count": MAX_BACKFILL_RESULTSETS}
+    results = s.get(url=resultset_url, params=params).json()["results"]
+    resultsets = [resultset["id"] for resultset in results]
+
+    for decision in load_decisions(s, project, resultsets, filters):
+        add_tasks(decision, [job["job_type_name"]], '{}-'.format(decision))
+
+
+def load_decisions(s, project, resultsets, filters):
+    """
+    Given a project, a list of revisions, and a dict of filters, return
+    a list of taskIds from decision tasks.
+    """
+    project_url = "{}/project/{}/jobs/".format(TREEHERDER_URL, project)
+    decision_url = "{}/jobdetail/".format(TREEHERDER_URL)
+    decisions = []
+    decision_ids = []
+
+    for resultset in resultsets:
+        unfiltered = []
+        offset = 0
+        jobs_per_call = 250
+        while True:
+            params = {"push_id": resultset, "count": jobs_per_call, "offset": offset}
+            results = s.get(url=project_url, params=params).json()["results"]
+            unfiltered += results
+            if (len(results) < jobs_per_call):
+                break
+            offset += jobs_per_call
+        filtered = [j for j in unfiltered if all([j[k] == filters[k] for k in filters])]
+        if len(filtered) > 1:
+            raise Exception("Too many jobs matched. Aborting.")
+        elif len(filtered) == 1:
+            if filtered[0]["result"] == "success":
+                break
+        decisions += [t for t in unfiltered if t["job_type_name"] == "Gecko Decision Task"]
+
+    for decision in decisions:
+        params = {"job_guid": decision["job_guid"]}
+        details = s.get(url=decision_url, params=params).json()["results"]
+        inspect = [detail["url"] for detail in details if detail["value"] == "Inspect Task"][0]
+
+        # Pull out the taskId from the URL e.g.
+        # oN1NErz_Rf2DZJ1hi7YVfA from tools.taskcluster.net/task-inspector/#oN1NErz_Rf2DZJ1hi7YVfA/
+        decision_ids.append(inspect.partition('#')[-1].rpartition('/')[0])
+    return decision_ids
--- a/taskcluster/taskgraph/action.yml
+++ b/taskcluster/taskgraph/action.yml
@@ -54,17 +54,17 @@ payload:
     - /home/worker/bin/run-task
     - '--vcs-checkout=/home/worker/checkouts/gecko'
     - '--'
     - bash
     - -cx
     - >
         cd /home/worker/checkouts/gecko &&
         ln -s /home/worker/artifacts artifacts &&
-        ./mach --log-no-times taskgraph action-task {{action_args}}
+        ./mach --log-no-times taskgraph {{action}} {{action_args}}
 
   artifacts:
     'public':
       type: 'directory'
       path: '/home/worker/artifacts'
       expires: '{{#from_now}}7 days{{/from_now}}'
 
 extra:
--- a/taskcluster/taskgraph/decision.py
+++ b/taskcluster/taskgraph/decision.py
@@ -176,13 +176,14 @@ def write_artifact(filename, data):
     else:
         raise TypeError("Don't know how to write to {}".format(filename))
 
 
 def get_action_yml(parameters):
     templates = Templates(os.path.join(GECKO, "taskcluster/taskgraph"))
     action_parameters = parameters.copy()
     action_parameters.update({
+        "action": "{{action}}",
         "action_args": "{{action_args}}",
         "from_now": json_time_from_now,
         "now": current_json_time()
     })
     return templates.load('action.yml', action_parameters)