Bug 1401995 Update funsize to use async, to reduce task time r=jlorenzo draft
authorSimon Fraser <sfraser@mozilla.com>
Wed, 03 Jan 2018 14:42:47 +0000
changeset 717197 a2b4d681ee3dbec82a523b39aff6d9e2e69d42f5
parent 715271 ac93fdadf1022211eec62258ad22b42cb37a6d14
child 745184 658068d6301dee897baa7c3021802925c45f5749
push id94595
push usersfraser@mozilla.com
push dateMon, 08 Jan 2018 12:11:30 +0000
reviewersjlorenzo
bugs1401995
milestone59.0a1
Bug 1401995 Update funsize to use async, to reduce task time r=jlorenzo MozReview-Commit-ID: 24IU3pcJseY
taskcluster/docker/funsize-update-generator/requirements.txt
taskcluster/docker/funsize-update-generator/runme.sh
taskcluster/docker/funsize-update-generator/scripts/funsize.py
tools/lint/py2.yml
--- a/taskcluster/docker/funsize-update-generator/requirements.txt
+++ b/taskcluster/docker/funsize-update-generator/requirements.txt
@@ -1,5 +1,7 @@
 mar==2.1.2
 backports.lzma==0.0.8
 datadog==0.17.0
 redo==1.6
+aiohttp==2.3.6
 awscli==1.14.10
+scriptworker==6.0.0
--- a/taskcluster/docker/funsize-update-generator/runme.sh
+++ b/taskcluster/docker/funsize-update-generator/runme.sh
@@ -13,17 +13,18 @@ curl --location --retry 10 --retry-delay
     "https://queue.taskcluster.net/v1/task/$TASK_ID"
 
 # auth:aws-s3:read-write:tc-gp-private-1d-us-east-1/releng/mbsdiff-cache/
 # -> bucket of tc-gp-private-1d-us-east-1, path of releng/mbsdiff-cache/
 # Trailing slash is important, due to prefix permissions in S3.
 S3_BUCKET_AND_PATH=$(jq -r '.scopes[] | select(contains ("auth:aws-s3"))' /home/worker/task.json | awk -F: '{print $4}')
 
 # Will be empty if there's no scope for AWS S3.
-if [ -n "${S3_BUCKET_AND_PATH}" ]; then
+if [ -n "${S3_BUCKET_AND_PATH}" ] && getent hosts taskcluster
+then
   # Does this parse as we expect?
   S3_PATH=${S3_BUCKET_AND_PATH#*/}
   AWS_BUCKET_NAME=${S3_BUCKET_AND_PATH%/${S3_PATH}*}
   test "${S3_PATH}"
   test "${AWS_BUCKET_NAME}"
 
   set +x  # Don't echo these.
   secret_url="taskcluster/auth/v1/aws/s3/read-write/${AWS_BUCKET_NAME}/${S3_PATH}"
--- a/taskcluster/docker/funsize-update-generator/scripts/funsize.py
+++ b/taskcluster/docker/funsize-update-generator/scripts/funsize.py
@@ -1,35 +1,44 @@
 #!/usr/bin/env python3
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, division, print_function
 
+import asyncio
+import aiohttp
 import configparser
 import argparse
 import hashlib
 import json
 import logging
 import os
 import shutil
 import tempfile
 import time
 import requests
 import sh
 
 import redo
+from scriptworker.utils import retry_async
 from mardor.reader import MarReader
 from mardor.signing import get_keysize
 
 from datadog import initialize, ThreadStats
 
 
 log = logging.getLogger(__name__)
+
+# Create this even when not sending metrics, so the context manager
+# statements work.
+ddstats = ThreadStats(namespace='releng.releases.partials')
+
+
 ALLOWED_URL_PREFIXES = [
     "http://download.cdn.mozilla.net/pub/mozilla.org/firefox/nightly/",
     "http://download.cdn.mozilla.net/pub/firefox/nightly/",
     "https://mozilla-nightly-updates.s3.amazonaws.com",
     "https://queue.taskcluster.net/",
     "http://ftp.mozilla.org/",
     "http://download.mozilla.org/",
     "https://archive.mozilla.org/",
@@ -66,131 +75,178 @@ def get_secret(secret_name):
     # 403: If unauthorized, just give up.
     if r.status_code == 403:
         log.info("Unable to get secret key")
         return {}
     r.raise_for_status()
     return r.json().get('secret', {})
 
 
-@redo.retriable()
-def download(url, dest, mode=None):
-    log.debug("Downloading %s to %s", url, dest)
-    r = requests.get(url)
-    r.raise_for_status()
+async def retry_download(*args, **kwargs):  # noqa: E999
+    """Retry download() calls."""
+    await retry_async(
+        download,
+        retry_exceptions=(
+            aiohttp.ClientError
+        ),
+        args=args,
+        kwargs=kwargs
+    )
+
+
+async def download(url, dest, mode=None):  # noqa: E999
+    log.info("Downloading %s to %s", url, dest)
 
     bytes_downloaded = 0
-    with open(dest, 'wb') as fd:
-        for chunk in r.iter_content(4096):
-            fd.write(chunk)
-            bytes_downloaded += len(chunk)
 
-    log.debug('Downloaded %s bytes', bytes_downloaded)
-    if 'content-length' in r.headers:
-        log.debug('Content-Length: %s bytes', r.headers['content-length'])
-        if bytes_downloaded != int(r.headers['content-length']):
-            raise IOError('Unexpected number of bytes downloaded')
+    async with aiohttp.ClientSession(raise_for_status=True) as session:
+        async with session.get(url) as resp:
+            with open(dest, 'wb') as fd:
+                while True:
+                    chunk = await resp.content.read(4096)
+                    if not chunk:
+                        break
+                    fd.write(chunk)
+                    bytes_downloaded += len(chunk)
 
-    if mode:
-        log.debug("chmod %o %s", mode, dest)
-        os.chmod(dest, mode)
+            log.debug('Downloaded %s bytes', bytes_downloaded)
+            if 'content-length' in resp.headers:
+                log.debug('Content-Length: %s bytes', resp.headers['content-length'])
+                if bytes_downloaded != int(resp.headers['content-length']):
+                    raise IOError('Unexpected number of bytes downloaded')
+
+            if mode:
+                log.debug("chmod %o %s", mode, dest)
+                os.chmod(dest, mode)
 
 
-def unpack(work_env, mar, dest_dir):
+async def run_command(cmd, cwd='/', env=None, label=None, silent=False):
+    if not env:
+        env = dict()
+    process = await asyncio.create_subprocess_shell(cmd,
+                                                    stdout=asyncio.subprocess.PIPE,
+                                                    stderr=asyncio.subprocess.STDOUT,
+                                                    cwd=cwd, env=env)
+    stdout, stderr = await process.communicate()
+
+    await process.wait()
+
+    if silent:
+        return
+
+    if not stderr:
+        stderr = ""
+    if not stdout:
+        stdout = ""
+
+    label = "{}: ".format(label)
+
+    for line in stdout.splitlines():
+        log.debug("%s%s", label, line.decode('utf-8'))
+    for line in stderr.splitlines():
+        log.warn("%s%s", label, line.decode('utf-8'))
+
+
+async def unpack(work_env, mar, dest_dir):
     os.mkdir(dest_dir)
-    unwrap_cmd = sh.Command(os.path.join(work_env.workdir,
-                                         "unwrap_full_update.pl"))
     log.debug("Unwrapping %s", mar)
     env = work_env.env
     if not is_lzma_compressed_mar(mar):
         env['MAR_OLD_FORMAT'] = '1'
     elif 'MAR_OLD_FORMAT' in env:
         del env['MAR_OLD_FORMAT']
-    out = unwrap_cmd(mar, _cwd=dest_dir, _env=env, _timeout=240,
-                     _err_to_out=True)
-    if out:
-        log.debug(out)
+
+    cmd = "{} {}".format(work_env.paths['unwrap_full_update.pl'], mar)
+    await run_command(cmd, cwd=dest_dir, env=env, label=dest_dir)
 
 
 def find_file(directory, filename):
     log.debug("Searching for %s in %s", filename, directory)
-    for root, dirs, files in os.walk(directory):
+    for root, _, files in os.walk(directory):
         if filename in files:
             f = os.path.join(root, filename)
             log.debug("Found %s", f)
             return f
 
 
 def get_option(directory, filename, section, option):
-    log.debug("Exctracting [%s]: %s from %s/**/%s", section, option, directory,
+    log.debug("Extracting [%s]: %s from %s/**/%s", section, option, directory,
               filename)
     f = find_file(directory, filename)
     config = configparser.ConfigParser()
     config.read(f)
     rv = config.get(section, option)
     log.debug("Found %s", rv)
     return rv
 
 
-def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids,
-                     version, use_old_format):
-    log.debug("Generating partial %s", dest_mar)
+async def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids,
+                           version, use_old_format):
+    log.info("Generating partial %s", dest_mar)
     env = work_env.env
     env["MOZ_PRODUCT_VERSION"] = version
     env["MOZ_CHANNEL_ID"] = channel_ids
     if use_old_format:
         env['MAR_OLD_FORMAT'] = '1'
     elif 'MAR_OLD_FORMAT' in env:
         del env['MAR_OLD_FORMAT']
     make_incremental_update = os.path.join(work_env.workdir,
                                            "make_incremental_update.sh")
-    out = sh.bash(make_incremental_update, dest_mar, from_dir, to_dir,
-                  _cwd=work_env.workdir, _env=env, _timeout=900,
-                  _err_to_out=True)
-    if out:
-        log.debug(out)
+    cmd = " ".join([make_incremental_update, dest_mar, from_dir, to_dir])
+
+    await run_command(cmd, cwd=work_env.workdir, env=env, label=dest_mar.split('/')[-1])
 
 
 def get_hash(path, hash_type="sha512"):
     h = hashlib.new(hash_type)
     with open(path, "rb") as f:
         h.update(f.read())
     return h.hexdigest()
 
 
 class WorkEnv(object):
 
     def __init__(self):
         self.workdir = tempfile.mkdtemp()
+        self.paths = {
+            'unwrap_full_update.pl': os.path.join(self.workdir, 'unwrap_full_update.pl'),
+            'mar': os.path.join(self.workdir, 'mar'),
+            'mbsdiff': os.path.join(self.workdir, 'mbsdiff')
+        }
 
-    def setup(self):
-        self.download_unwrap()
-        self.download_martools()
+    async def setup(self):
+        await self.download_unwrap()
+        await self.download_martools()
 
-    def download_unwrap(self):
+    async def clone(self, workenv):
+        for path in workenv.paths:
+            if os.path.exists(self.paths[path]):
+                os.unlink(self.paths[path])
+            os.link(workenv.paths[path], self.paths[path])
+
+    async def download_unwrap(self):
         # unwrap_full_update.pl is not too sensitive to the revision
         url = "https://hg.mozilla.org/mozilla-central/raw-file/default/" \
             "tools/update-packaging/unwrap_full_update.pl"
-        download(url, dest=os.path.join(self.workdir, "unwrap_full_update.pl"),
-                 mode=0o755)
+        await retry_download(url, dest=self.paths['unwrap_full_update.pl'], mode=0o755)
 
-    def download_buildsystem_bits(self, repo, revision):
+    async def download_buildsystem_bits(self, repo, revision):
         prefix = "{repo}/raw-file/{revision}/tools/update-packaging"
         prefix = prefix.format(repo=repo, revision=revision)
-        for f in ("make_incremental_update.sh", "common.sh"):
+        for f in ('make_incremental_update.sh', 'common.sh'):
             url = "{prefix}/{f}".format(prefix=prefix, f=f)
-            download(url, dest=os.path.join(self.workdir, f), mode=0o755)
+            await retry_download(url, dest=os.path.join(self.workdir, f), mode=0o755)
 
-    def download_martools(self):
+    async def download_martools(self):
         # TODO: check if the tools have to be branch specific
         prefix = "https://ftp.mozilla.org/pub/mozilla.org/firefox/nightly/" \
             "latest-mozilla-central/mar-tools/linux64"
-        for f in ("mar", "mbsdiff"):
+        for f in ('mar', 'mbsdiff'):
             url = "{prefix}/{f}".format(prefix=prefix, f=f)
-            download(url, dest=os.path.join(self.workdir, f), mode=0o755)
+            await retry_download(url, dest=self.paths[f], mode=0o755)
 
     def cleanup(self):
         shutil.rmtree(self.workdir)
 
     @property
     def env(self):
         my_env = os.environ.copy()
         my_env['LC_ALL'] = 'C'
@@ -201,16 +257,161 @@ class WorkEnv(object):
 
 def verify_allowed_url(mar):
     if not any(mar.startswith(prefix) for prefix in ALLOWED_URL_PREFIXES):
         raise ValueError("{mar} is not in allowed URL prefixes: {p}".format(
             mar=mar, p=ALLOWED_URL_PREFIXES
         ))
 
 
+async def manage_partial(partial_def, work_env, filename_template, artifacts_dir, signing_certs):
+    """Manage the creation of partial mars based on payload."""
+    for mar in (partial_def["from_mar"], partial_def["to_mar"]):
+        verify_allowed_url(mar)
+
+    complete_mars = {}
+    use_old_format = False
+
+    for mar_type, f in (("from", partial_def["from_mar"]), ("to", partial_def["to_mar"])):
+        dest = os.path.join(work_env.workdir, "{}.mar".format(mar_type))
+        unpack_dir = os.path.join(work_env.workdir, mar_type)
+
+        with ddstats.timer('mar.download.time'):
+            await retry_download(f, dest)
+
+        if not os.getenv("MOZ_DISABLE_MAR_CERT_VERIFICATION"):
+            verify_signature(dest, signing_certs)
+
+        complete_mars["%s_size" % mar_type] = os.path.getsize(dest)
+        complete_mars["%s_hash" % mar_type] = get_hash(dest)
+
+        with ddstats.timer('mar.unpack.time'):
+            await unpack(work_env, dest, unpack_dir)
+
+        if mar_type == 'from':
+            version = get_option(unpack_dir, filename="application.ini",
+                                 section="App", option="Version")
+            major = int(version.split(".")[0])
+            # The updater for versions less than 56.0 requires BZ2
+            # compressed MAR files
+            if major < 56:
+                use_old_format = True
+                log.info("Forcing BZ2 compression for %s", f)
+
+        log.info("AV-scanning %s ...", unpack_dir)
+        metric_tags = [
+            "platform:{}".format(partial_def['platform']),
+        ]
+        with ddstats.timer('mar.clamscan.time', tags=metric_tags):
+            await run_command("clamscan -r {}".format(unpack_dir), label='clamscan')
+        log.info("Done.")
+
+    to_path = os.path.join(work_env.workdir, "to")
+    from_path = os.path.join(work_env.workdir, "from")
+
+    mar_data = {
+        "ACCEPTED_MAR_CHANNEL_IDS": get_option(
+            to_path, filename="update-settings.ini", section="Settings",
+            option="ACCEPTED_MAR_CHANNEL_IDS"),
+        "version": get_option(to_path, filename="application.ini",
+                              section="App", option="Version"),
+        "to_buildid": get_option(to_path, filename="application.ini",
+                                 section="App", option="BuildID"),
+        "from_buildid": get_option(from_path, filename="application.ini",
+                                   section="App", option="BuildID"),
+        "appName": get_option(from_path, filename="application.ini",
+                              section="App", option="Name"),
+        # Use Gecko repo and rev from platform.ini, not application.ini
+        "repo": get_option(to_path, filename="platform.ini", section="Build",
+                           option="SourceRepository"),
+        "revision": get_option(to_path, filename="platform.ini",
+                               section="Build", option="SourceStamp"),
+        "from_mar": partial_def["from_mar"],
+        "to_mar": partial_def["to_mar"],
+        "platform": partial_def["platform"],
+        "locale": partial_def["locale"],
+    }
+    # Override ACCEPTED_MAR_CHANNEL_IDS if needed
+    if "ACCEPTED_MAR_CHANNEL_IDS" in os.environ:
+        mar_data["ACCEPTED_MAR_CHANNEL_IDS"] = os.environ["ACCEPTED_MAR_CHANNEL_IDS"]
+    for field in ("update_number", "previousVersion", "previousBuildNumber",
+                  "toVersion", "toBuildNumber"):
+        if field in partial_def:
+            mar_data[field] = partial_def[field]
+    mar_data.update(complete_mars)
+
+    # if branch not set explicitly use repo-name
+    mar_data['branch'] = partial_def.get('branch', mar_data['repo'].rstrip('/').split('/')[-1])
+
+    if 'dest_mar' in partial_def:
+        mar_name = partial_def['dest_mar']
+    else:
+        # default to formatted name if not specified
+        mar_name = filename_template.format(**mar_data)
+
+    mar_data['mar'] = mar_name
+    dest_mar = os.path.join(work_env.workdir, mar_name)
+
+    # TODO: download these once
+    await work_env.download_buildsystem_bits(repo=mar_data["repo"],
+                                             revision=mar_data["revision"])
+
+    metric_tags = [
+        "branch:{}".format(mar_data['branch']),
+        "platform:{}".format(mar_data['platform']),
+        # If required. Shouldn't add much useful info, but increases
+        # cardinality of metrics substantially, so avoided.
+        # "locale:{}".format(mar_data['locale']),
+    ]
+    with ddstats.timer('generate_partial.time', tags=metric_tags):
+        await generate_partial(work_env, from_path, to_path, dest_mar,
+                               mar_data["ACCEPTED_MAR_CHANNEL_IDS"],
+                               mar_data["version"],
+                               use_old_format)
+
+    mar_data["size"] = os.path.getsize(dest_mar)
+
+    metric_tags.append("unit:bytes")
+    # Allows us to find out how many releases there were between the two,
+    # making buckets of the file sizes easier.
+    metric_tags.append("update_number:{}".format(mar_data.get('update_number', 0)))
+    ddstats.gauge('partial_mar_size', mar_data['size'], tags=metric_tags)
+
+    mar_data["hash"] = get_hash(dest_mar)
+
+    shutil.copy(dest_mar, artifacts_dir)
+    work_env.cleanup()
+
+    return mar_data
+
+
+async def async_main(args, signing_certs):
+    tasks = []
+
+    master_env = WorkEnv()
+    await master_env.setup()
+
+    task = json.load(args.task_definition)
+    # TODO: verify task["extra"]["funsize"]["partials"] with jsonschema
+    for definition in task["extra"]["funsize"]["partials"]:
+        workenv = WorkEnv()
+        await workenv.clone(master_env)
+        tasks.append(asyncio.ensure_future(manage_partial(
+            partial_def=definition,
+            filename_template=args.filename_template,
+            artifacts_dir=args.artifacts_dir,
+            work_env=workenv,
+            signing_certs=signing_certs)
+        ))
+
+    manifest = await asyncio.gather(*tasks)
+    master_env.cleanup()
+    return manifest
+
+
 def main():
 
     start = time.time()
 
     parser = argparse.ArgumentParser()
     parser.add_argument("--artifacts-dir", required=True)
     parser.add_argument("--sha1-signing-cert", required=True)
     parser.add_argument("--sha384-signing-cert", required=True)
@@ -222,37 +423,31 @@ def main():
                         help="Do not refresh ClamAV DB")
     parser.add_argument("-q", "--quiet", dest="log_level",
                         action="store_const", const=logging.WARNING,
                         default=logging.DEBUG)
     args = parser.parse_args()
 
     logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
     log.setLevel(args.log_level)
-    task = json.load(args.task_definition)
-    # TODO: verify task["extra"]["funsize"]["partials"] with jsonschema
 
     signing_certs = {
         'sha1': open(args.sha1_signing_cert, 'rb').read(),
         'sha384': open(args.sha384_signing_cert, 'rb').read(),
     }
 
     assert(get_keysize(signing_certs['sha1']) == 2048)
     assert(get_keysize(signing_certs['sha384']) == 4096)
 
     # Intended for local testing.
     dd_api_key = os.environ.get('DATADOG_API_KEY')
     # Intended for Taskcluster.
     if not dd_api_key and os.environ.get('DATADOG_API_SECRET'):
         dd_api_key = get_secret(os.environ.get('DATADOG_API_SECRET')).get('key')
 
-    # Create this even when not sending metrics, so the context manager
-    # statements work.
-    ddstats = ThreadStats(namespace='releng.releases.partials')
-
     if dd_api_key:
         dd_options = {
             'api_key': dd_api_key,
         }
         log.info("Starting metric collection")
         initialize(**dd_options)
         ddstats.start(flush_interval=1)
     else:
@@ -264,139 +459,35 @@ def main():
         log.info("Refreshing clamav db...")
         try:
             redo.retry(lambda: sh.freshclam("--stdout", "--verbose",
                                             _timeout=300, _err_to_out=True))
             log.info("Done.")
         except sh.ErrorReturnCode:
             log.warning("Freshclam failed, skipping DB update")
 
-    manifest = []
-    for e in task["extra"]["funsize"]["partials"]:
-        for mar in (e["from_mar"], e["to_mar"]):
-            verify_allowed_url(mar)
-
-        work_env = WorkEnv()
-        # TODO: run setup once
-        work_env.setup()
-        complete_mars = {}
-        use_old_format = False
-        for mar_type, f in (("from", e["from_mar"]), ("to", e["to_mar"])):
-            dest = os.path.join(work_env.workdir, "{}.mar".format(mar_type))
-            unpack_dir = os.path.join(work_env.workdir, mar_type)
-            with ddstats.timer('mar.download.time'):
-                download(f, dest)
-            if not os.getenv("MOZ_DISABLE_MAR_CERT_VERIFICATION"):
-                verify_signature(dest, signing_certs)
-            complete_mars["%s_size" % mar_type] = os.path.getsize(dest)
-            complete_mars["%s_hash" % mar_type] = get_hash(dest)
-            with ddstats.timer('mar.unpack.time'):
-                unpack(work_env, dest, unpack_dir)
-            if mar_type == 'from':
-                version = get_option(unpack_dir, filename="application.ini",
-                                     section="App", option="Version")
-                major = int(version.split(".")[0])
-                # The updater for versions less than 56.0 requires BZ2
-                # compressed MAR files
-                if major < 56:
-                    use_old_format = True
-                    log.info("Forcing BZ2 compression for %s", f)
-            log.info("AV-scanning %s ...", unpack_dir)
-            metric_tags = [
-                "platform:{}".format(e['platform']),
-            ]
-            with ddstats.timer('mar.clamscan.time', tags=metric_tags):
-                sh.clamscan("-r", unpack_dir, _timeout=600, _err_to_out=True)
-            log.info("Done.")
-
-        path = os.path.join(work_env.workdir, "to")
-        from_path = os.path.join(work_env.workdir, "from")
-        mar_data = {
-            "ACCEPTED_MAR_CHANNEL_IDS": get_option(
-                path, filename="update-settings.ini", section="Settings",
-                option="ACCEPTED_MAR_CHANNEL_IDS"),
-            "version": get_option(path, filename="application.ini",
-                                  section="App", option="Version"),
-            "to_buildid": get_option(path, filename="application.ini",
-                                     section="App", option="BuildID"),
-            "from_buildid": get_option(from_path, filename="application.ini",
-                                       section="App", option="BuildID"),
-            "appName": get_option(from_path, filename="application.ini",
-                                  section="App", option="Name"),
-            # Use Gecko repo and rev from platform.ini, not application.ini
-            "repo": get_option(path, filename="platform.ini", section="Build",
-                               option="SourceRepository"),
-            "revision": get_option(path, filename="platform.ini",
-                                   section="Build", option="SourceStamp"),
-            "from_mar": e["from_mar"],
-            "to_mar": e["to_mar"],
-            "platform": e["platform"],
-            "locale": e["locale"],
-        }
-        # Override ACCEPTED_MAR_CHANNEL_IDS if needed
-        if "ACCEPTED_MAR_CHANNEL_IDS" in os.environ:
-            mar_data["ACCEPTED_MAR_CHANNEL_IDS"] = os.environ["ACCEPTED_MAR_CHANNEL_IDS"]
-        for field in ("update_number", "previousVersion",
-                      "previousBuildNumber", "toVersion",
-                      "toBuildNumber"):
-            if field in e:
-                mar_data[field] = e[field]
-        mar_data.update(complete_mars)
-        # if branch not set explicitly use repo-name
-        mar_data["branch"] = e.get("branch",
-                                   mar_data["repo"].rstrip("/").split("/")[-1])
-        if 'dest_mar' in e:
-            mar_name = e['dest_mar']
-        else:
-            # default to formatted name if not specified
-            mar_name = args.filename_template.format(**mar_data)
-        mar_data["mar"] = mar_name
-        dest_mar = os.path.join(work_env.workdir, mar_name)
-        # TODO: download these once
-        work_env.download_buildsystem_bits(repo=mar_data["repo"],
-                                           revision=mar_data["revision"])
-
-        metric_tags = [
-            "branch:{}".format(mar_data['branch']),
-            "platform:{}".format(mar_data['platform']),
-            # If required. Shouldn't add much useful info, but increases
-            # cardinality of metrics substantially, so avoided.
-            # "locale:{}".format(mar_data['locale']),
-        ]
-
-        with ddstats.timer('generate_partial.time', tags=metric_tags):
-            generate_partial(work_env, from_path, path, dest_mar,
-                             mar_data["ACCEPTED_MAR_CHANNEL_IDS"],
-                             mar_data["version"],
-                             use_old_format)
-
-        mar_data["size"] = os.path.getsize(dest_mar)
-        metric_tags.append("unit:bytes")
-        # Allows us to find out how many releases there were between the two,
-        # making buckets of the file sizes easier.
-        metric_tags.append("update_number:{}".format(mar_data.get('update_number', 0)))
-        ddstats.gauge('partial_mar_size', mar_data['size'], tags=metric_tags)
-
-        mar_data["hash"] = get_hash(dest_mar)
-
-        shutil.copy(dest_mar, args.artifacts_dir)
-        work_env.cleanup()
-        manifest.append(mar_data)
+    loop = asyncio.get_event_loop()
+    manifest = loop.run_until_complete(async_main(args, signing_certs))
+    loop.close()
 
     manifest_file = os.path.join(args.artifacts_dir, "manifest.json")
     with open(manifest_file, "w") as fp:
         json.dump(manifest, fp, indent=2, sort_keys=True)
 
+    log.debug("{}".format(json.dumps(manifest, indent=2, sort_keys=True)))
+
     # Warning: Assumption that one partials task will always be for one branch.
     metric_tags = [
-        "branch:{}".format(mar_data['branch']),
+        "branch:{}".format(manifest[0]['branch']),
     ]
 
     ddstats.timing('task_duration', time.time() - start,
                    start, tags=metric_tags)
+
     # Wait for all the metrics to flush. If the program ends before
     # they've been sent, they'll be dropped.
     # Should be more than the flush_interval for the ThreadStats object
-    time.sleep(10)
+    if dd_api_key:
+        time.sleep(10)
 
 
 if __name__ == '__main__':
     main()
--- a/tools/lint/py2.yml
+++ b/tools/lint/py2.yml
@@ -29,16 +29,17 @@ py2:
         - probes/trace-gen.py
         - python/devtools
         - python/mach
         - python/mozbuild
         - python/mozversioncontrol
         - security
         - services/common/tests/mach_commands.py
         - servo
+        - taskcluster/docker/funsize-update-generator
         - testing/awsy
         - testing/firefox-ui
         - testing/geckodriver
         - testing/gtest
         - testing/marionette
         - testing/mochitest
         - testing/mozharness
         - testing/remotecppunittests.py