Bug 1259627 - [tools] Use TC Queue API r=mtabara
MozReview-Commit-ID: 25hAYnaLqCg
--- a/buildfarm/release/release-runner.py
+++ b/buildfarm/release/release-runner.py
@@ -10,31 +10,32 @@ import subprocess
import hashlib
import functools
import shutil
import tempfile
import requests
from os import path
from optparse import OptionParser
from twisted.python.lockfile import FilesystemLock
-from taskcluster import Scheduler, Index, Queue
+from taskcluster import Index, Queue
from taskcluster.utils import slugId
import yaml
site.addsitedir(path.join(path.dirname(__file__), "../../lib/python"))
from kickoff import (get_partials, ReleaseRunner,
make_task_graph_strict_kwargs, long_revision,
get_l10n_config, get_en_US_config, email_release_drivers,
bump_version, get_funsize_product, get_mar_signing_format)
from kickoff.sanity.base import SanityException, is_candidate_release
from kickoff.sanity.revisions import RevisionsSanitizer
from kickoff.sanity.l10n import L10nSanitizer
from kickoff.sanity.partials import PartialsSanitizer
from kickoff.build_status import are_en_us_builds_completed
+from kickoff.tc import resolve_task, submit_parallelized
from release.info import readBranchConfig
from release.l10n import parsePlainL10nChangesets
from release.versions import getAppVersion
from util.hg import mercurial
from util.retry import retry
log = logging.getLogger(__name__)
@@ -338,19 +339,18 @@ def main(options):
beetmover_aws_access_key_id = config["beetmover"].get("aws_access_key_id")
beetmover_aws_secret_access_key = config["beetmover"].get("aws_secret_access_key")
gpg_key_path = config["signing"].get("gpg_key_path")
# TODO: replace release sanity with direct checks of en-US and l10n
# revisions (and other things if needed)
rr = ReleaseRunner(api_root=api_root, username=username, password=password)
- scheduler = Scheduler(retrying_tc_config)
index = Index(tc_config)
- queue = Queue(tc_config)
+ queue = Queue(retrying_tc_config)
# Main loop waits for new releases, processes them and exits.
while True:
try:
log.debug('Fetching release requests')
rr.get_release_requests([r['pattern'] for r in config['releases']])
if rr.new_releases:
new_releases = run_prebuild_sanity_checks(
@@ -406,17 +406,17 @@ def main(options):
final_verify_channels = release_channels
publish_to_balrog_channels = release_channels
push_to_releases_enabled = True
# XXX: Doesn't work with neither Fennec nor Thunderbird
platforms = branchConfig['release_platforms']
try:
- graph_id = slugId()
+ task_group_id = None
done = are_en_us_builds_completed(
index=index, release_name=release['name'],
submitted_at=release['submittedAt'],
revision=release['mozillaRevision'],
platforms=platforms, queue=queue,
tc_task_indexes=branchConfig['tc_indexes'][release['product']])
if not done:
log.info(
@@ -507,49 +507,53 @@ def main(options):
"publish_to_balrog_channels": publish_to_balrog_channels,
"snap_enabled": branchConfig.get("snap_enabled", {}).get(release["product"], False),
"update_verify_channel": branchConfig.get("update_verify_channel", {}).get(release["product"]),
"update_verify_requires_cdn_push": branchConfig.get("update_verify_requires_cdn_push", False),
}
# TODO: en-US validation for multiple tasks
# validate_graph_kwargs(queue, gpg_key_path, **kwargs)
- graph = make_task_graph_strict_kwargs(**kwargs)
- rr.update_status(release, "Submitting task graph")
- log.info("Task graph generated!")
+ task_group_id, toplevel_task_id, tasks = make_task_graph_strict_kwargs(**kwargs)
+ rr.update_status(release, "Submitting tasks")
+ log.info("Tasks generated!")
import pprint
- log.debug(pprint.pformat(graph, indent=4, width=160))
- print(scheduler.createTaskGraph(graph_id, graph))
+ for task_id, task_def in tasks.items():
+ log.debug("%s ->\n%s", task_id,
+ pprint.pformat(task_def, indent=4, width=160))
+ submit_parallelized(queue, tasks)
+ resolve_task(queue, toplevel_task_id)
rr.mark_as_completed(release)
l10n_url = rr.release_l10n_api.getL10nFullUrl(release['name'])
email_release_drivers(smtp_server=smtp_server, from_=notify_from,
to=notify_to, release=release,
- task_group_id=graph_id, l10n_url=l10n_url)
+ task_group_id=task_group_id, l10n_url=l10n_url)
except Exception as exception:
# We explicitly do not raise an error here because there's no
# reason not to start other releases if creating the Task Graph
# fails for another one. We _do_ need to set this in order to exit
# with the right code, though.
rc = 2
rr.mark_as_failed(
release,
'Failed to start release promotion (graph ID: %s). Error(s): %s' % (
- graph_id, exception)
+ task_group_id, exception)
)
log.exception('Failed to start release "%s" promotion for graph %s. Error(s): %s',
- release['name'], graph_id, exception)
+ release['name'], task_group_id, exception)
log.debug('Release failed: %s', release)
if rc != 0:
sys.exit(rc)
log.debug('Sleeping for %s seconds before polling again', sleeptime)
time.sleep(sleeptime)
+
if __name__ == '__main__':
parser = OptionParser(__doc__)
parser.add_option('-l', '--lockfile', dest='lockfile',
default=path.join(os.getcwd(), ".release-runner.lock"))
parser.add_option('-c', '--config', dest='config',
help='Configuration file')
options = parser.parse_args()[0]
--- a/buildfarm/release/releasetasks_graph_gen.py
+++ b/buildfarm/release/releasetasks_graph_gen.py
@@ -6,37 +6,37 @@ from optparse import OptionParser
import site
import yaml
site.addsitedir(os.path.join(os.path.dirname(__file__), "../../lib/python"))
from kickoff import get_partials, ReleaseRunner, make_task_graph_strict_kwargs
from kickoff import get_l10n_config, get_en_US_config, get_mar_signing_format
from kickoff import bump_version
+from kickoff.tc import resolve_task, submit_parallelized
from release.versions import getAppVersion
from util.file import load_config, get_config
-from taskcluster import Scheduler, Index, Queue
+from taskcluster import Index, Queue
from taskcluster.utils import slugId
log = logging.getLogger(__name__)
def main(release_runner_config, release_config, tc_config):
api_root = release_runner_config.get('api', 'api_root')
username = release_runner_config.get('api', 'username')
password = release_runner_config.get('api', 'password')
- scheduler = Scheduler(tc_config)
+ queue = Queue(tc_config)
index = Index(tc_config)
rr = ReleaseRunner(api_root=api_root, username=username, password=password)
- graph_id = slugId()
log.info('Generating task graph')
kwargs = {
# release-runner.ini
"signing_pvt_key": release_config['signing_pvt_key'],
"public_key": release_config['docker_worker_key'],
"balrog_username": release_config['balrog_username'],
"balrog_password": release_config['balrog_password'],
"beetmover_aws_access_key_id": release_config['beetmover_aws_access_key_id'],
@@ -117,22 +117,26 @@ def main(release_runner_config, release_
"postrelease_mark_as_shipped_enabled": release_config["postrelease_mark_as_shipped_enabled"],
# TODO: use [] when snaps_enabled is landed
"snap_enabled": release_config.get("snap_enabled", False),
"update_verify_channel": release_config["update_verify_channel"],
"update_verify_requires_cdn_push": release_config["update_verify_requires_cdn_push"],
"release_eta": release_config.get("release_eta"),
}
- graph = make_task_graph_strict_kwargs(**kwargs)
- log.info("Submitting task graph")
+ task_group_id, toplevel_task_id, tasks = make_task_graph_strict_kwargs(**kwargs)
+ log.info("Tasks generated!")
import pprint
- log.info(pprint.pformat(graph, indent=4, width=160))
+ for task_id, task_def in tasks.items():
+ log.debug("%s ->\n%s", task_id,
+ pprint.pformat(task_def, indent=4, width=160))
+
if not options.dry_run:
- print scheduler.createTaskGraph(graph_id, graph)
+ submit_parallelized(queue, tasks)
+ resolve_task(queue, toplevel_task_id)
def get_items_from_common_tc_task(common_task_id, tc_config):
tc_task_items = {}
queue = Queue(tc_config)
task = queue.task(common_task_id)
tc_task_items["version"] = task["extra"]["build_props"]["version"]
tc_task_items["build_number"] = task["extra"]["build_props"]["build_number"]
@@ -205,17 +209,18 @@ if __name__ == '__main__':
parser.error('Need to pass a branch and product config')
# load config files
release_runner_config = load_config(options.release_runner_ini)
tc_config = {
"credentials": {
"clientId": get_config(release_runner_config, "taskcluster", "client_id", None),
"accessToken": get_config(release_runner_config, "taskcluster", "access_token", None),
- }
+ },
+ "maxRetries": 12,
}
branch_product_config = load_branch_and_product_config(options.branch_and_product_config)
if release_runner_config.getboolean('release-runner', 'verbose'):
log_level = logging.DEBUG
else:
log_level = logging.INFO
logging.basicConfig(filename='releasetasks_graph_gen.log',
--- a/lib/python/kickoff/__init__.py
+++ b/lib/python/kickoff/__init__.py
@@ -359,20 +359,20 @@ def make_task_graph_strict_kwargs(appVer
update_verify_channel=update_verify_channel,
update_verify_requires_cdn_push=update_verify_requires_cdn_push,
funsize_product=funsize_product,
release_eta=release_eta,
)
if extra_balrog_submitter_params:
kwargs["extra_balrog_submitter_params"] = extra_balrog_submitter_params
- # don't import releasetasks until required within function impl to avoid global failures
- # during nosetests
- from releasetasks import make_task_graph
- return make_task_graph(**kwargs)
+ # don't import releasetasks until required within function impl to avoid
+ # global failures during nosetests
+ from releasetasks import make_tasks
+ return make_tasks(**kwargs)
def get_funsize_product(product_name):
if product_name == 'devedition': # See bug 1366075
return 'firefox'
return product_name
new file mode 100644
--- /dev/null
+++ b/lib/python/kickoff/tc.py
@@ -0,0 +1,41 @@
+import logging
+
+import concurrent.futures as futures
+
+log = logging.getLogger(__name__)
+CONCURRENCY = 50
+
+
+def resolve_task(queue, task_id, worker_id="releaserunner"):
+ curr_status = queue.status(task_id)
+ run_id = curr_status['status']['runs'][-1]['runId']
+ payload = {"workerGroup": curr_status['status']['workerType'],
+ "workerId": worker_id}
+ queue.claimTask(task_id, run_id, payload)
+ queue.reportCompleted(task_id, run_id)
+
+
+def submit_parallelized(queue, tasks):
+ """Submit topologically sorted tasks parallelized
+
+ Stolen from https://dxr.mozilla.org/mozilla-central/rev/f0abd25e1f4acced652d180c34b7c9eda638deb1/taskcluster/taskgraph/create.py#28
+ """
+
+ def submit_task(t_id, t_def):
+ log.info("Submitting %s", t_id)
+ queue.createTask(t_id, t_def)
+
+ with futures.ThreadPoolExecutor(CONCURRENCY) as e:
+ fs = {}
+ for task_id, task_def in tasks.items():
+ deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
+ if dep in fs]
+ # Wait for dependencies before submitting this.
+ for f in futures.as_completed(deps_fs):
+ f.result()
+
+ fs[task_id] = e.submit(submit_task, task_id, task_def)
+
+ # Wait for all futures to complete.
+ for f in futures.as_completed(fs.values()):
+ f.result()