mozreview: added new "--forks" parallel build switch (bug 1277903) draft
authorMāris Fogels <mars@mozilla.com>
Wed, 08 Jun 2016 17:04:24 -0400
changeset 8691 1e45b7af22e6044e2eee5fdb0fa605eac47fccc2
parent 8690 b0b29eb4ec131cc58c64808e57bdeba6da71def4
child 8692 f1ffd671e4cdd134abfdef6b7fc8e64cfea48929
push id966
push usermfogels@mozilla.com
push dateWed, 29 Jun 2016 17:13:14 +0000
bugs1277903
mozreview: added new "--forks" parallel build switch (bug 1277903) The new switch allows you to specify the number of parallel processes the mozreview and d0cker programs use when building images and containers. The switch is named after the same switch in ansible. MozReview-Commit-ID: 2BDhRM3lnrz
testing/vcttesting/docker.py
testing/vcttesting/docker_mach_commands.py
testing/vcttesting/mozreview.py
testing/vcttesting/mozreview_mach_commands.py
testing/vcttesting/util.py
--- a/testing/vcttesting/docker.py
+++ b/testing/vcttesting/docker.py
@@ -35,16 +35,17 @@ from docker.errors import (
 )
 from contextlib import contextmanager
 from io import BytesIO
 
 import concurrent.futures as futures
 from coverage.data import CoverageData
 
 from .util import (
+    limited_threadpoolexecutor,
     wait_for_amqp,
     wait_for_http,
     wait_for_ssh,
 )
 from .vctutil import (
     get_and_write_vct_node,
 )
 
@@ -332,17 +333,18 @@ class Docker(object):
 
             res = self.client.import_image_from_data(
                 fh, repository=repository, tag=tag)
             # docker-py doesn't parse the JSON response in what is almost
             # certainly a bug. Do it ourselves.
             return json.loads(res.strip())['status']
 
     def ensure_built(self, name, verbose=False, use_last=False):
-        """Ensure a Docker image from a builder directory is built and up to date.
+        """Ensure a Docker image from a builder directory is built and up to
+        date.
 
         This function is docker build++. Under the hood, it talks to the same
         ``build`` Docker API. However, it does one important thing differently:
         it builds the context archive manually.
 
         We supplement all contexts with the content of the source in this
         repository related to building Docker containers. This is done by
         scanning the Dockerfile for references to extra files to include.
@@ -496,27 +498,32 @@ class Docker(object):
                 if not have_tag:
                     self.client.tag(full_image, name, str(uuid.uuid1()))
 
                 return full_image
 
         raise Exception('Unable to confirm image was built: %s' % name)
 
     def ensure_images_built(self, names, ansibles=None, existing=None,
-                            verbose=False, use_last=False):
+                            verbose=False, use_last=False, max_workers=None):
         """Ensure that multiple images are built.
 
         ``names`` is a list of Docker images to build.
         ``ansibles`` describes how to build ansible-based images. Keys
         are repositories. Values are tuples of (playbook, builder). If an
         image in the specified repositories is found, we'll use it as the
         start image. Otherwise, we'll use the configured builder.
 
         If ``use_last`` is true, we will use the last built image instead
         of building a new one.
+
+        If ``max_workers`` is less than 1 or is None, use the default number
+        of worker threads to perform I/O intensive tasks.  Otherwise use the
+        specified number of threads.  Useful for debugging and reducing
+        load on resource-constrained machines.
         """
         ansibles = ansibles or {}
         existing = existing or {}
 
         # Verify existing images actually exist.
         docker_images = self.all_docker_images()
 
         images = {k: v for k, v in existing.items() if v in docker_images}
@@ -565,17 +572,17 @@ class Docker(object):
                                                 repository=repository,
                                                 builder=builder,
                                                 start_image=start_image,
                                                 vct_cid=vct_cid,
                                                 verbose=verbose)
             return repository, image
 
         with self.vct_container(verbose=verbose) as vct_state, \
-                futures.ThreadPoolExecutor(len(missing)) as e:
+                limited_threadpoolexecutor(len(missing), max_workers) as e:
             vct_cid = vct_state['Id']
             fs = []
             builder_fs = {}
             for n in sorted(missing):
                 if n in names:
                     fs.append(e.submit(build, n, verbose=verbose))
                 else:
                     playbook, builder = ansibles[n]
@@ -751,54 +758,55 @@ class Docker(object):
         self.state['last-bmoweb-bootstrap-id'] = bmoweb_bootstrap
         self.save_state()
 
         return {
             'bmoweb': bmoweb_bootstrap,
         }
 
     def build_mozreview(self, images=None, verbose=False, use_last=False,
-                        build_hgweb=True, build_bmo=True):
+                        build_hgweb=True, build_bmo=True, max_workers=None):
         """Ensure the images for a MozReview service are built.
 
         bmoweb's entrypoint does a lot of setup on first run. This takes many
         seconds to perform and this cost is unacceptable for efficient test
         running. So, when we build the BMO images, we create throwaway
         containers and commit the results to a new image. This allows us to
         spin up multiple bmoweb containers very quickly.
         """
         images = images or {}
 
         # Building BMO images is a 2 phase step: image build + bootstrap.
         # Because bootstrap can occur concurrently with other image
         # building, we build BMO images separately and initiate bootstrap
         # as soon as it is ready.
-        with futures.ThreadPoolExecutor(2) as e:
+        with limited_threadpoolexecutor(2, max_workers) as e:
             if build_bmo:
                 f_bmo_images = e.submit(self.build_bmo, images=images,
                                         verbose=verbose)
 
             ansibles = {
                 'hgrb': ('docker-hgrb', 'centos6'),
                 'rbweb': ('docker-rbweb', 'centos6'),
             }
             if build_hgweb:
                 ansibles['hgweb'] = ('docker-hgweb', 'centos6')
 
-            f_images = e.submit(
-                self.ensure_images_built,
-                [
-                    'autolanddb',
-                    'autoland',
-                    'ldap',
-                    'pulse',
-                    'treestatus',
-                ],
-                ansibles=ansibles, existing=images, verbose=verbose,
-                use_last=use_last)
+            f_images = e.submit(self.ensure_images_built, [
+                'autolanddb',
+                'autoland',
+                'ldap',
+                'pulse',
+                'treestatus',
+            ],
+                ansibles=ansibles,
+                existing=images,
+                verbose=verbose,
+                use_last=use_last,
+                max_workers=max_workers)
 
             if build_bmo:
                 bmo_images = f_bmo_images.result()
                 bmoweb_bootstrap = bmo_images['bmoweb']
 
         images.update(f_images.result())
 
         self.state['last-autolanddb-id'] = images['autolanddb']
@@ -916,17 +924,17 @@ class Docker(object):
     def start_mozreview(
             self, cluster, http_port=80,
             hgrb_image=None, ldap_image=None, ldap_port=None, pulse_port=None,
             rbweb_port=None, web_image=None, pulse_image=None,
             rbweb_image=None, ssh_port=None, hg_port=None,
             autolanddb_image=None, autoland_image=None, autoland_port=None,
             hgweb_image=None, hgweb_port=None,
             treestatus_image=None, treestatus_port=None,
-            verbose=False):
+            max_workers=None, verbose=False):
 
         start_ldap = False
         if ldap_port:
             start_ldap = True
 
         start_hgrb = False
         if ssh_port or hg_port:
             start_hgrb = True
@@ -972,30 +980,31 @@ class Docker(object):
             hgweb_image = None
         if treestatus_image and treestatus_image not in known_images:
             treestatus_image = None
 
         if (not web_image or not hgrb_image or not ldap_image
                 or not pulse_image or not autolanddb_image
                 or not autoland_image or not rbweb_image or not hgweb_image
                 or not treestatus_image):
-            images = self.build_mozreview(verbose=verbose)
+            images = self.build_mozreview(
+                max_workers=max_workers, verbose=verbose)
             autolanddb_image = images['autolanddb']
             autoland_image = images['autoland']
             hgrb_image = images['hgrb']
             ldap_image = images['ldap']
             web_image = images['bmoweb']
             pulse_image = images['pulse']
             rbweb_image = images['rbweb']
             hgweb_image = images['hgweb']
             treestatus_image = images['treestatus']
 
         containers = self.state['containers'].setdefault(cluster, [])
 
-        with futures.ThreadPoolExecutor(10) as e:
+        with limited_threadpoolexecutor(10, max_workers) as e:
             if start_pulse:
                 f_pulse_create = e.submit(
                     self.client.create_container,
                     pulse_image,
                     labels=['pulse'])
 
             bmo_url = 'http://%s:%s/' % (self.docker_hostname, http_port)
 
@@ -1190,17 +1199,17 @@ class Docker(object):
             hgweb_hostname, hgweb_hostport = \
                 self._get_host_hostname_port(hgweb_state, '80/tcp')
 
         if start_treestatus:
             treestatus_hostname, treestatus_hostport = \
                 self._get_host_hostname_port(treestatus_state, '80/tcp')
 
         fs = []
-        with futures.ThreadPoolExecutor(7) as e:
+        with limited_threadpoolexecutor(7, max_workers) as e:
             fs.append(e.submit(
                 wait_for_http, bmoweb_hostname, bmoweb_hostport,
                 extra_check_fn=self._get_assert_container_running_fn(web_id)))
             if start_pulse:
                 fs.append(e.submit(
                     wait_for_amqp, rabbit_hostname, rabbit_hostport,
                     'guest', 'guest',
                     extra_check_fn=self._get_assert_container_running_fn(
@@ -1298,17 +1307,17 @@ class Docker(object):
 
         try:
             del self.state['containers'][cluster]
             self.save_state()
         except KeyError:
             pass
 
     def build_all_images(self, verbose=False, use_last=False, mozreview=True,
-                         hgmo=True, bmo=True):
+                         hgmo=True, bmo=True, max_workers=None):
         docker_images = set()
         ansible_images = {}
         if mozreview:
             docker_images |= {
                 'autolanddb',
                 'autoland',
                 'bmoweb',
                 'ldap',
@@ -1331,25 +1340,23 @@ class Docker(object):
                 'bmoweb',
             }
 
         images = self.ensure_images_built(docker_images,
                                           ansibles=ansible_images,
                                           verbose=verbose,
                                           use_last=use_last)
 
-        with futures.ThreadPoolExecutor(3) as e:
+        with limited_threadpoolexecutor(3, max_workers) as e:
             if mozreview:
-                f_mr = e.submit(
-                    self.build_mozreview,
-                    images=images,
-                    verbose=verbose,
-                    use_last=use_last,
-                    build_hgweb=not hgmo,
-                    build_bmo=not bmo)
+                f_mr = e.submit(self.build_mozreview, images=images,
+                                verbose=verbose, use_last=use_last,
+                                build_hgweb=not hgmo,
+                                build_bmo=not bmo,
+                                max_workers=max_workers)
             if hgmo:
                 f_hgmo = e.submit(
                     self.build_hgmo,
                     images=images,
                     verbose=verbose,
                     use_last=use_last)
 
             if bmo:
--- a/testing/vcttesting/docker_mach_commands.py
+++ b/testing/vcttesting/docker_mach_commands.py
@@ -46,18 +46,21 @@ class DockerCommands(object):
 
     @Command('build-hgmo', category='docker',
         description='Build hg.mozilla.org Docker images')
     def build_hgmo(self):
         self.d.build_hgmo(verbose=True)
 
     @Command('build-mozreview', category='docker',
         description='Build Docker images required for MozReview')
-    def build_mozreview(self):
-        self.d.build_mozreview(verbose=True)
+    @CommandArgument('--forks', type=int,
+                     help='Number of parallel build processes to use. '
+                          '(default=unlimited)')
+    def build_mozreview(self, forks=None):
+        self.d.build_mozreview(verbose=True, max_workers=forks)
 
     @Command('start-bmo', category='docker',
         description='Start a bugzilla.mozilla.org instance')
     @CommandArgument('cluster', help='Name to give to this instance')
     @CommandArgument('http_port',
         help='HTTP port the server should be exposed on')
     @CommandArgument('--web-id-file',
         help='File to store the bmoweb container ID in')
@@ -81,18 +84,21 @@ class DockerCommands(object):
     @Command('build', category='docker',
              description='Build a single image')
     @CommandArgument('name', help='Name of image to build')
     def build(self, name):
         self.d.ensure_built(name, verbose=True)
 
     @Command('build-all', category='docker',
              description='Build all images')
-    def build_all(self):
-        self.d.build_all_images(verbose=True)
+    @CommandArgument('--forks', type=int,
+                     help='Number of parallel build processes to use. '
+                          '(default=unlimited)')
+    def build_all(self, forks=None):
+        self.d.build_all_images(verbose=True, max_workers=forks)
 
     @Command('run-ansible', category='docker',
              description='Run Ansible to produce a Docker image')
     @CommandArgument('playbook',
                      help='Name of Ansible playbook to execute')
     @CommandArgument('--builder',
                      help='Docker build to start from')
     @CommandArgument('--start-image',
--- a/testing/vcttesting/mozreview.py
+++ b/testing/vcttesting/mozreview.py
@@ -19,17 +19,17 @@ from vcttesting.bugzilla import Bugzilla
 from vcttesting.docker import (
     Docker,
     DockerNotAvailable,
     params_from_env,
 )
 from vcttesting.reviewboard import MozReviewBoard
 
 from .ldap import LDAP
-from .util import get_available_port
+from .util import get_available_port, limited_threadpoolexecutor
 
 HERE = os.path.abspath(os.path.dirname(__file__))
 ROOT = os.path.normpath(os.path.join(HERE, '..', '..'))
 
 
 SSH_CONFIG = '''
 Host *
   StrictHostKeyChecking no
@@ -145,23 +145,23 @@ class MozReview(object):
                               pulse_host=self.pulse_host,
                               pulse_port=self.pulse_port)
 
     def get_ldap(self):
         """Obtain an LDAP instance connected to the LDAP server in this instance."""
         return LDAP(self.ldap_uri, 'cn=admin,dc=mozilla', 'password')
 
     def start(self, bugzilla_port=None, reviewboard_port=None,
-            mercurial_port=None, pulse_port=None, verbose=False,
-            web_image=None, hgrb_image=None,
-            ldap_image=None, ldap_port=None, pulse_image=None,
-            rbweb_image=None, ssh_port=None,
-            hgweb_image=None, hgweb_port=None,
-            autolanddb_image=None, autoland_image=None, autoland_port=None,
-            treestatus_image=None, treestatus_port=None):
+              mercurial_port=None, pulse_port=None, verbose=False,
+              web_image=None, hgrb_image=None,
+              ldap_image=None, ldap_port=None, pulse_image=None,
+              rbweb_image=None, ssh_port=None,
+              hgweb_image=None, hgweb_port=None,
+              autolanddb_image=None, autoland_image=None, autoland_port=None,
+              treestatus_image=None, treestatus_port=None, max_workers=None):
         """Start a MozReview instance."""
         if self.started:
             raise Exception('MozReview instance has already been started')
 
         if not bugzilla_port:
             bugzilla_port = get_available_port()
         if not reviewboard_port:
             reviewboard_port = get_available_port()
@@ -210,16 +210,17 @@ class MozReview(object):
                 hg_port=mercurial_port,
                 autolanddb_image=autolanddb_image,
                 autoland_image=autoland_image,
                 autoland_port=autoland_port,
                 hgweb_image=hgweb_image,
                 hgweb_port=hgweb_port,
                 treestatus_image=treestatus_image,
                 treestatus_port=treestatus_port,
+                max_workers=max_workers,
                 verbose=verbose)
 
         self.bmoweb_id = mr_info['web_id']
 
         self.bugzilla_url = mr_info['bugzilla_url']
         bugzilla = self.get_bugzilla()
 
         self.reviewboard_url = mr_info['reviewboard_url']
@@ -259,17 +260,17 @@ class MozReview(object):
         rb.login_user(bugzilla.username, bugzilla.password)
 
         # Ensure the mozreview hg user is present and has privileges.
         # This has to occur after the admin user is logged in to avoid
         # race conditions with user IDs.
         rb.create_local_user(self.hg_rb_username, self.hg_rb_email,
                              self.hg_rb_password)
 
-        with futures.ThreadPoolExecutor(7) as e:
+        with limited_threadpoolexecutor(7, max_workers) as e:
             # Ensure admin user had admin privileges.
             e.submit(rb.make_admin, bugzilla.username)
 
             # Ensure mozreview user has permissions for testing.
             e.submit(rb.grant_permission, self.hg_rb_username,
                      'Can change ldap assocation for all users')
 
             e.submit(rb.grant_permission, self.hg_rb_username,
--- a/testing/vcttesting/mozreview_mach_commands.py
+++ b/testing/vcttesting/mozreview_mach_commands.py
@@ -54,31 +54,34 @@ class MozReviewCommands(object):
     @CommandArgument('--ldap-port', type=int,
                      help='Port LDAP server should listen on.')
     @CommandArgument('--ssh-port', type=int,
                      help='Port Mercurial SSH server should listen on.')
     @CommandArgument('--hgweb-port', type=int,
                      help='Port hg.mo HTTP server should listen on.')
     @CommandArgument('--treestatus-port', type=int,
                      help='Port treestatus HTTP server should listen on.')
+    @CommandArgument('--forks', type=int,
+                     help='Number of parallel build processes to use. (default=unlimited)')
     def start(self, where, bugzilla_port=None, reviewboard_port=None,
             mercurial_port=None, pulse_port=None, autoland_port=None,
             ldap_port=None, ssh_port=None, hgweb_port=None,
-            treestatus_port=None):
+            treestatus_port=None, forks=None):
         mr = self._get_mozreview(where)
         mr.start(bugzilla_port=bugzilla_port,
-                reviewboard_port=reviewboard_port,
-                mercurial_port=mercurial_port,
-                pulse_port=pulse_port,
-                autoland_port=autoland_port,
-                ldap_port=ldap_port,
-                ssh_port=ssh_port,
-                hgweb_port=hgweb_port,
-                treestatus_port=treestatus_port,
-                verbose=True)
+                 reviewboard_port=reviewboard_port,
+                 mercurial_port=mercurial_port,
+                 pulse_port=pulse_port,
+                 autoland_port=autoland_port,
+                 ldap_port=ldap_port,
+                 ssh_port=ssh_port,
+                 hgweb_port=hgweb_port,
+                 treestatus_port=treestatus_port,
+                 max_workers=forks,
+                 verbose=True)
 
         print('Bugzilla URL: %s' % mr.bugzilla_url)
         print('Review Board URL: %s' % mr.reviewboard_url)
         print('Mercurial RB URL: %s' % mr.mercurial_url)
         print('hg.mo URL: %s' % mr.hgweb_url)
         print('Pulse endpoint: %s:%s' % (mr.pulse_host, mr.pulse_port))
         print('Autoland URL: %s' % mr.autoland_url)
         print('Treestatus URL: %s' % mr.treestatus_url)
--- a/testing/vcttesting/util.py
+++ b/testing/vcttesting/util.py
@@ -3,16 +3,17 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, unicode_literals
 
 import os
 import socket
 import time
 
+import concurrent.futures as futures
 from kafka.client import KafkaClient
 import kombu
 import paramiko
 import requests
 
 
 HERE = os.path.abspath(os.path.dirname(__file__))
 ROOT = os.path.normpath(os.path.join(HERE, '..', '..'))
@@ -120,8 +121,37 @@ def wait_for_kafka(hostport, timeout=60)
             return
         except Exception:
             pass
 
         if time.time() - start > timeout:
             raise Exception('Timeout reached waiting for Kafka')
 
         time.sleep(0.1)
+
+
+def limited_threadpoolexecutor(wanted_workers, max_workers=None):
+    """Return a ThreadPoolExecutor with up to ``max_workers`` executors.
+
+    Call with ``wanted_workers`` equal to None to ask for the default number
+    of workers, which is the number of processors on the machine multiplied
+    by 5.
+
+    Call with ``max_workers`` less than 1 or ``max_workers=None`` to specify
+    no limit on worker threads.
+
+    See https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
+    """
+    # Are we trying to ask for default workers, which is the "number of
+    # processors on the machine, multiplied by 5"?
+    # See https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
+    wants_unlimited = (wanted_workers is None) or (wanted_workers < 1)
+
+    max_unlimited = (max_workers is None) or (max_workers < 1)
+
+    if max_unlimited:
+        workers = wanted_workers
+    elif wants_unlimited:
+        workers = max_workers
+    else:
+        workers = min(wanted_workers, max_workers)
+
+    return futures.ThreadPoolExecutor(workers)
\ No newline at end of file