Bug 1259627 - [tools] Use TC Queue API r=mtabara draft
authorRail Aliiev <rail@mozilla.com>
Thu, 24 Aug 2017 16:02:12 +0300
changeset 8027 fcd429270616d7d5a9273170dac13788ae0365c5
parent 8026 366a7b278252cf7497e34fd79e3294da610b2db5
push id235
push userbmo:rail@mozilla.com
push dateThu, 24 Aug 2017 13:02:26 +0000
reviewersmtabara
bugs1259627
Bug 1259627 - [tools] Use TC Queue API r=mtabara MozReview-Commit-ID: 25hAYnaLqCg
buildfarm/release/release-runner.py
buildfarm/release/releasetasks_graph_gen.py
lib/python/kickoff/__init__.py
lib/python/kickoff/tc.py
--- a/buildfarm/release/release-runner.py
+++ b/buildfarm/release/release-runner.py
@@ -10,31 +10,32 @@ import subprocess
 import hashlib
 import functools
 import shutil
 import tempfile
 import requests
 from os import path
 from optparse import OptionParser
 from twisted.python.lockfile import FilesystemLock
-from taskcluster import Scheduler, Index, Queue
+from taskcluster import Index, Queue
 from taskcluster.utils import slugId
 import yaml
 
 site.addsitedir(path.join(path.dirname(__file__), "../../lib/python"))
 
 from kickoff import (get_partials, ReleaseRunner,
                      make_task_graph_strict_kwargs, long_revision,
                      get_l10n_config, get_en_US_config, email_release_drivers,
                      bump_version, get_funsize_product, get_mar_signing_format)
 from kickoff.sanity.base import SanityException, is_candidate_release
 from kickoff.sanity.revisions import RevisionsSanitizer
 from kickoff.sanity.l10n import L10nSanitizer
 from kickoff.sanity.partials import PartialsSanitizer
 from kickoff.build_status import are_en_us_builds_completed
+from kickoff.tc import resolve_task, submit_parallelized
 from release.info import readBranchConfig
 from release.l10n import parsePlainL10nChangesets
 from release.versions import getAppVersion
 from util.hg import mercurial
 from util.retry import retry
 
 log = logging.getLogger(__name__)
 
@@ -338,19 +339,18 @@ def main(options):
     beetmover_aws_access_key_id = config["beetmover"].get("aws_access_key_id")
     beetmover_aws_secret_access_key = config["beetmover"].get("aws_secret_access_key")
     gpg_key_path = config["signing"].get("gpg_key_path")
 
     # TODO: replace release sanity with direct checks of en-US and l10n
     # revisions (and other things if needed)
 
     rr = ReleaseRunner(api_root=api_root, username=username, password=password)
-    scheduler = Scheduler(retrying_tc_config)
     index = Index(tc_config)
-    queue = Queue(tc_config)
+    queue = Queue(retrying_tc_config)
 
     # Main loop waits for new releases, processes them and exits.
     while True:
         try:
             log.debug('Fetching release requests')
             rr.get_release_requests([r['pattern'] for r in config['releases']])
             if rr.new_releases:
                 new_releases = run_prebuild_sanity_checks(
@@ -406,17 +406,17 @@ def main(options):
             final_verify_channels = release_channels
             publish_to_balrog_channels = release_channels
             push_to_releases_enabled = True
 
         # XXX: Doesn't work with neither Fennec nor Thunderbird
         platforms = branchConfig['release_platforms']
 
         try:
-            graph_id = slugId()
+            task_group_id = None
             done = are_en_us_builds_completed(
                 index=index, release_name=release['name'],
                 submitted_at=release['submittedAt'],
                 revision=release['mozillaRevision'],
                 platforms=platforms, queue=queue,
                 tc_task_indexes=branchConfig['tc_indexes'][release['product']])
             if not done:
                 log.info(
@@ -507,49 +507,53 @@ def main(options):
                 "publish_to_balrog_channels": publish_to_balrog_channels,
                 "snap_enabled": branchConfig.get("snap_enabled", {}).get(release["product"], False),
                 "update_verify_channel": branchConfig.get("update_verify_channel", {}).get(release["product"]),
                 "update_verify_requires_cdn_push": branchConfig.get("update_verify_requires_cdn_push", False),
             }
 
             # TODO: en-US validation for multiple tasks
             # validate_graph_kwargs(queue, gpg_key_path, **kwargs)
-            graph = make_task_graph_strict_kwargs(**kwargs)
-            rr.update_status(release, "Submitting task graph")
-            log.info("Task graph generated!")
+            task_group_id, toplevel_task_id, tasks = make_task_graph_strict_kwargs(**kwargs)
+            rr.update_status(release, "Submitting tasks")
+            log.info("Tasks generated!")
             import pprint
-            log.debug(pprint.pformat(graph, indent=4, width=160))
-            print(scheduler.createTaskGraph(graph_id, graph))
+            for task_id, task_def in tasks.items():
+                log.debug("%s ->\n%s", task_id,
+                          pprint.pformat(task_def, indent=4, width=160))
+            submit_parallelized(queue, tasks)
+            resolve_task(queue, toplevel_task_id)
 
             rr.mark_as_completed(release)
             l10n_url = rr.release_l10n_api.getL10nFullUrl(release['name'])
             email_release_drivers(smtp_server=smtp_server, from_=notify_from,
                                   to=notify_to, release=release,
-                                  task_group_id=graph_id, l10n_url=l10n_url)
+                                  task_group_id=task_group_id, l10n_url=l10n_url)
         except Exception as exception:
             # We explicitly do not raise an error here because there's no
             # reason not to start other releases if creating the Task Graph
             # fails for another one. We _do_ need to set this in order to exit
             # with the right code, though.
             rc = 2
             rr.mark_as_failed(
                 release,
                 'Failed to start release promotion (graph ID: %s). Error(s): %s' % (
-                    graph_id, exception)
+                    task_group_id, exception)
             )
             log.exception('Failed to start release "%s" promotion for graph %s. Error(s): %s',
-                          release['name'], graph_id, exception)
+                          release['name'], task_group_id, exception)
             log.debug('Release failed: %s', release)
 
     if rc != 0:
         sys.exit(rc)
 
     log.debug('Sleeping for %s seconds before polling again', sleeptime)
     time.sleep(sleeptime)
 
+
 if __name__ == '__main__':
     parser = OptionParser(__doc__)
     parser.add_option('-l', '--lockfile', dest='lockfile',
                       default=path.join(os.getcwd(), ".release-runner.lock"))
     parser.add_option('-c', '--config', dest='config',
                       help='Configuration file')
 
     options = parser.parse_args()[0]
--- a/buildfarm/release/releasetasks_graph_gen.py
+++ b/buildfarm/release/releasetasks_graph_gen.py
@@ -6,37 +6,37 @@ from optparse import OptionParser
 import site
 import yaml
 
 site.addsitedir(os.path.join(os.path.dirname(__file__), "../../lib/python"))
 
 from kickoff import get_partials, ReleaseRunner, make_task_graph_strict_kwargs
 from kickoff import get_l10n_config, get_en_US_config, get_mar_signing_format
 from kickoff import bump_version
+from kickoff.tc import resolve_task, submit_parallelized
 
 from release.versions import getAppVersion
 from util.file import load_config, get_config
 
-from taskcluster import Scheduler, Index, Queue
+from taskcluster import Index, Queue
 from taskcluster.utils import slugId
 
 log = logging.getLogger(__name__)
 
 
 def main(release_runner_config, release_config, tc_config):
 
     api_root = release_runner_config.get('api', 'api_root')
     username = release_runner_config.get('api', 'username')
     password = release_runner_config.get('api', 'password')
 
-    scheduler = Scheduler(tc_config)
+    queue = Queue(tc_config)
     index = Index(tc_config)
 
     rr = ReleaseRunner(api_root=api_root, username=username, password=password)
-    graph_id = slugId()
     log.info('Generating task graph')
     kwargs = {
         # release-runner.ini
         "signing_pvt_key": release_config['signing_pvt_key'],
         "public_key": release_config['docker_worker_key'],
         "balrog_username": release_config['balrog_username'],
         "balrog_password": release_config['balrog_password'],
         "beetmover_aws_access_key_id": release_config['beetmover_aws_access_key_id'],
@@ -117,22 +117,26 @@ def main(release_runner_config, release_
         "postrelease_mark_as_shipped_enabled": release_config["postrelease_mark_as_shipped_enabled"],
         # TODO: use [] when snaps_enabled is landed
         "snap_enabled": release_config.get("snap_enabled", False),
         "update_verify_channel": release_config["update_verify_channel"],
         "update_verify_requires_cdn_push": release_config["update_verify_requires_cdn_push"],
         "release_eta": release_config.get("release_eta"),
     }
 
-    graph = make_task_graph_strict_kwargs(**kwargs)
-    log.info("Submitting task graph")
+    task_group_id, toplevel_task_id, tasks = make_task_graph_strict_kwargs(**kwargs)
+    log.info("Tasks generated!")
     import pprint
-    log.info(pprint.pformat(graph, indent=4, width=160))
+    for task_id, task_def in tasks.items():
+        log.debug("%s ->\n%s", task_id,
+                  pprint.pformat(task_def, indent=4, width=160))
+
     if not options.dry_run:
-        print scheduler.createTaskGraph(graph_id, graph)
+        submit_parallelized(queue, tasks)
+        resolve_task(queue, toplevel_task_id)
 
 
 def get_items_from_common_tc_task(common_task_id, tc_config):
     tc_task_items = {}
     queue = Queue(tc_config)
     task = queue.task(common_task_id)
     tc_task_items["version"] = task["extra"]["build_props"]["version"]
     tc_task_items["build_number"] = task["extra"]["build_props"]["build_number"]
@@ -205,17 +209,18 @@ if __name__ == '__main__':
         parser.error('Need to pass a branch and product config')
 
     # load config files
     release_runner_config = load_config(options.release_runner_ini)
     tc_config = {
         "credentials": {
             "clientId": get_config(release_runner_config, "taskcluster", "client_id", None),
             "accessToken": get_config(release_runner_config, "taskcluster", "access_token", None),
-        }
+        },
+        "maxRetries": 12,
     }
     branch_product_config = load_branch_and_product_config(options.branch_and_product_config)
 
     if release_runner_config.getboolean('release-runner', 'verbose'):
         log_level = logging.DEBUG
     else:
         log_level = logging.INFO
     logging.basicConfig(filename='releasetasks_graph_gen.log',
--- a/lib/python/kickoff/__init__.py
+++ b/lib/python/kickoff/__init__.py
@@ -359,20 +359,20 @@ def make_task_graph_strict_kwargs(appVer
         update_verify_channel=update_verify_channel,
         update_verify_requires_cdn_push=update_verify_requires_cdn_push,
         funsize_product=funsize_product,
         release_eta=release_eta,
     )
     if extra_balrog_submitter_params:
         kwargs["extra_balrog_submitter_params"] = extra_balrog_submitter_params
 
-    # don't import releasetasks until required within function impl to avoid global failures
-    # during nosetests
-    from releasetasks import make_task_graph
-    return make_task_graph(**kwargs)
+    # don't import releasetasks until required within function impl to avoid
+    # global failures during nosetests
+    from releasetasks import make_tasks
+    return make_tasks(**kwargs)
 
 
 def get_funsize_product(product_name):
     if product_name == 'devedition':    # See bug 1366075
         return 'firefox'
     return product_name
 
 
new file mode 100644
--- /dev/null
+++ b/lib/python/kickoff/tc.py
@@ -0,0 +1,41 @@
+import logging
+
+import concurrent.futures as futures
+
+log = logging.getLogger(__name__)
+CONCURRENCY = 50
+
+
+def resolve_task(queue, task_id, worker_id="releaserunner"):
+    curr_status = queue.status(task_id)
+    run_id = curr_status['status']['runs'][-1]['runId']
+    payload = {"workerGroup": curr_status['status']['workerType'],
+               "workerId": worker_id}
+    queue.claimTask(task_id, run_id, payload)
+    queue.reportCompleted(task_id, run_id)
+
+
+def submit_parallelized(queue, tasks):
+    """Submit topologically sorted tasks parallelized
+
+    Stolen from https://dxr.mozilla.org/mozilla-central/rev/f0abd25e1f4acced652d180c34b7c9eda638deb1/taskcluster/taskgraph/create.py#28
+    """
+
+    def submit_task(t_id, t_def):
+        log.info("Submitting %s", t_id)
+        queue.createTask(t_id, t_def)
+
+    with futures.ThreadPoolExecutor(CONCURRENCY) as e:
+        fs = {}
+        for task_id, task_def in tasks.items():
+            deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
+                       if dep in fs]
+            # Wait for dependencies before submitting this.
+            for f in futures.as_completed(deps_fs):
+                    f.result()
+
+            fs[task_id] = e.submit(submit_task, task_id, task_def)
+
+        # Wait for all futures to complete.
+        for f in futures.as_completed(fs.values()):
+            f.result()