--- a/testing/vcttesting/docker.py
+++ b/testing/vcttesting/docker.py
@@ -35,16 +35,17 @@ from docker.errors import (
)
from contextlib import contextmanager
from io import BytesIO
import concurrent.futures as futures
from coverage.data import CoverageData
from .util import (
+ limited_threadpoolexecutor,
wait_for_amqp,
wait_for_http,
wait_for_ssh,
)
from .vctutil import (
get_and_write_vct_node,
)
@@ -332,17 +333,18 @@ class Docker(object):
res = self.client.import_image_from_data(
fh, repository=repository, tag=tag)
# docker-py doesn't parse the JSON response in what is almost
# certainly a bug. Do it ourselves.
return json.loads(res.strip())['status']
def ensure_built(self, name, verbose=False, use_last=False):
- """Ensure a Docker image from a builder directory is built and up to date.
+ """Ensure a Docker image from a builder directory is built and up to
+ date.
This function is docker build++. Under the hood, it talks to the same
``build`` Docker API. However, it does one important thing differently:
it builds the context archive manually.
We supplement all contexts with the content of the source in this
repository related to building Docker containers. This is done by
scanning the Dockerfile for references to extra files to include.
@@ -496,27 +498,32 @@ class Docker(object):
if not have_tag:
self.client.tag(full_image, name, str(uuid.uuid1()))
return full_image
raise Exception('Unable to confirm image was built: %s' % name)
def ensure_images_built(self, names, ansibles=None, existing=None,
- verbose=False, use_last=False):
+ verbose=False, use_last=False, max_workers=None):
"""Ensure that multiple images are built.
``names`` is a list of Docker images to build.
``ansibles`` describes how to build ansible-based images. Keys
are repositories. Values are tuples of (playbook, builder). If an
image in the specified repositories is found, we'll use it as the
start image. Otherwise, we'll use the configured builder.
If ``use_last`` is true, we will use the last built image instead
of building a new one.
+
+ If ``max_workers`` is less than 1 or is None, use the default number
+ of worker threads to perform I/O intensive tasks. Otherwise use the
+ specified number of threads. Useful for debugging and reducing
+ load on resource-constrained machines.
"""
ansibles = ansibles or {}
existing = existing or {}
# Verify existing images actually exist.
docker_images = self.all_docker_images()
images = {k: v for k, v in existing.items() if v in docker_images}
@@ -565,17 +572,17 @@ class Docker(object):
repository=repository,
builder=builder,
start_image=start_image,
vct_cid=vct_cid,
verbose=verbose)
return repository, image
with self.vct_container(verbose=verbose) as vct_state, \
- futures.ThreadPoolExecutor(len(missing)) as e:
+ limited_threadpoolexecutor(len(missing), max_workers) as e:
vct_cid = vct_state['Id']
fs = []
builder_fs = {}
for n in sorted(missing):
if n in names:
fs.append(e.submit(build, n, verbose=verbose))
else:
playbook, builder = ansibles[n]
@@ -751,54 +758,55 @@ class Docker(object):
self.state['last-bmoweb-bootstrap-id'] = bmoweb_bootstrap
self.save_state()
return {
'bmoweb': bmoweb_bootstrap,
}
def build_mozreview(self, images=None, verbose=False, use_last=False,
- build_hgweb=True, build_bmo=True):
+ build_hgweb=True, build_bmo=True, max_workers=None):
"""Ensure the images for a MozReview service are built.
bmoweb's entrypoint does a lot of setup on first run. This takes many
seconds to perform and this cost is unacceptable for efficient test
running. So, when we build the BMO images, we create throwaway
containers and commit the results to a new image. This allows us to
spin up multiple bmoweb containers very quickly.
"""
images = images or {}
# Building BMO images is a 2 phase step: image build + bootstrap.
# Because bootstrap can occur concurrently with other image
# building, we build BMO images separately and initiate bootstrap
# as soon as it is ready.
- with futures.ThreadPoolExecutor(2) as e:
+ with limited_threadpoolexecutor(2, max_workers) as e:
if build_bmo:
f_bmo_images = e.submit(self.build_bmo, images=images,
verbose=verbose)
ansibles = {
'hgrb': ('docker-hgrb', 'centos6'),
'rbweb': ('docker-rbweb', 'centos6'),
}
if build_hgweb:
ansibles['hgweb'] = ('docker-hgweb', 'centos6')
- f_images = e.submit(
- self.ensure_images_built,
- [
- 'autolanddb',
- 'autoland',
- 'ldap',
- 'pulse',
- 'treestatus',
- ],
- ansibles=ansibles, existing=images, verbose=verbose,
- use_last=use_last)
+ f_images = e.submit(self.ensure_images_built, [
+ 'autolanddb',
+ 'autoland',
+ 'ldap',
+ 'pulse',
+ 'treestatus',
+ ],
+ ansibles=ansibles,
+ existing=images,
+ verbose=verbose,
+ use_last=use_last,
+ max_workers=max_workers)
if build_bmo:
bmo_images = f_bmo_images.result()
bmoweb_bootstrap = bmo_images['bmoweb']
images.update(f_images.result())
self.state['last-autolanddb-id'] = images['autolanddb']
@@ -916,17 +924,17 @@ class Docker(object):
def start_mozreview(
self, cluster, http_port=80,
hgrb_image=None, ldap_image=None, ldap_port=None, pulse_port=None,
rbweb_port=None, web_image=None, pulse_image=None,
rbweb_image=None, ssh_port=None, hg_port=None,
autolanddb_image=None, autoland_image=None, autoland_port=None,
hgweb_image=None, hgweb_port=None,
treestatus_image=None, treestatus_port=None,
- verbose=False):
+ max_workers=None, verbose=False):
start_ldap = False
if ldap_port:
start_ldap = True
start_hgrb = False
if ssh_port or hg_port:
start_hgrb = True
@@ -972,30 +980,31 @@ class Docker(object):
hgweb_image = None
if treestatus_image and treestatus_image not in known_images:
treestatus_image = None
if (not web_image or not hgrb_image or not ldap_image
or not pulse_image or not autolanddb_image
or not autoland_image or not rbweb_image or not hgweb_image
or not treestatus_image):
- images = self.build_mozreview(verbose=verbose)
+ images = self.build_mozreview(
+ max_workers=max_workers, verbose=verbose)
autolanddb_image = images['autolanddb']
autoland_image = images['autoland']
hgrb_image = images['hgrb']
ldap_image = images['ldap']
web_image = images['bmoweb']
pulse_image = images['pulse']
rbweb_image = images['rbweb']
hgweb_image = images['hgweb']
treestatus_image = images['treestatus']
containers = self.state['containers'].setdefault(cluster, [])
- with futures.ThreadPoolExecutor(10) as e:
+ with limited_threadpoolexecutor(10, max_workers) as e:
if start_pulse:
f_pulse_create = e.submit(
self.client.create_container,
pulse_image,
labels=['pulse'])
bmo_url = 'http://%s:%s/' % (self.docker_hostname, http_port)
@@ -1190,17 +1199,17 @@ class Docker(object):
hgweb_hostname, hgweb_hostport = \
self._get_host_hostname_port(hgweb_state, '80/tcp')
if start_treestatus:
treestatus_hostname, treestatus_hostport = \
self._get_host_hostname_port(treestatus_state, '80/tcp')
fs = []
- with futures.ThreadPoolExecutor(7) as e:
+ with limited_threadpoolexecutor(7, max_workers) as e:
fs.append(e.submit(
wait_for_http, bmoweb_hostname, bmoweb_hostport,
extra_check_fn=self._get_assert_container_running_fn(web_id)))
if start_pulse:
fs.append(e.submit(
wait_for_amqp, rabbit_hostname, rabbit_hostport,
'guest', 'guest',
extra_check_fn=self._get_assert_container_running_fn(
@@ -1298,17 +1307,17 @@ class Docker(object):
try:
del self.state['containers'][cluster]
self.save_state()
except KeyError:
pass
def build_all_images(self, verbose=False, use_last=False, mozreview=True,
- hgmo=True, bmo=True):
+ hgmo=True, bmo=True, max_workers=None):
docker_images = set()
ansible_images = {}
if mozreview:
docker_images |= {
'autolanddb',
'autoland',
'bmoweb',
'ldap',
@@ -1331,25 +1340,23 @@ class Docker(object):
'bmoweb',
}
images = self.ensure_images_built(docker_images,
ansibles=ansible_images,
verbose=verbose,
use_last=use_last)
- with futures.ThreadPoolExecutor(3) as e:
+ with limited_threadpoolexecutor(3, max_workers) as e:
if mozreview:
- f_mr = e.submit(
- self.build_mozreview,
- images=images,
- verbose=verbose,
- use_last=use_last,
- build_hgweb=not hgmo,
- build_bmo=not bmo)
+ f_mr = e.submit(self.build_mozreview, images=images,
+ verbose=verbose, use_last=use_last,
+ build_hgweb=not hgmo,
+ build_bmo=not bmo,
+ max_workers=max_workers)
if hgmo:
f_hgmo = e.submit(
self.build_hgmo,
images=images,
verbose=verbose,
use_last=use_last)
if bmo:
--- a/testing/vcttesting/docker_mach_commands.py
+++ b/testing/vcttesting/docker_mach_commands.py
@@ -46,18 +46,21 @@ class DockerCommands(object):
@Command('build-hgmo', category='docker',
description='Build hg.mozilla.org Docker images')
def build_hgmo(self):
self.d.build_hgmo(verbose=True)
@Command('build-mozreview', category='docker',
description='Build Docker images required for MozReview')
- def build_mozreview(self):
- self.d.build_mozreview(verbose=True)
+ @CommandArgument('--forks', type=int,
+ help='Number of parallel build processes to use. '
+ '(default=unlimited)')
+ def build_mozreview(self, forks=None):
+ self.d.build_mozreview(verbose=True, max_workers=forks)
@Command('start-bmo', category='docker',
description='Start a bugzilla.mozilla.org instance')
@CommandArgument('cluster', help='Name to give to this instance')
@CommandArgument('http_port',
help='HTTP port the server should be exposed on')
@CommandArgument('--web-id-file',
help='File to store the bmoweb container ID in')
@@ -81,18 +84,21 @@ class DockerCommands(object):
@Command('build', category='docker',
description='Build a single image')
@CommandArgument('name', help='Name of image to build')
def build(self, name):
self.d.ensure_built(name, verbose=True)
@Command('build-all', category='docker',
description='Build all images')
- def build_all(self):
- self.d.build_all_images(verbose=True)
+ @CommandArgument('--forks', type=int,
+ help='Number of parallel build processes to use. '
+ '(default=unlimited)')
+ def build_all(self, forks=None):
+ self.d.build_all_images(verbose=True, max_workers=forks)
@Command('run-ansible', category='docker',
description='Run Ansible to produce a Docker image')
@CommandArgument('playbook',
help='Name of Ansible playbook to execute')
@CommandArgument('--builder',
help='Docker build to start from')
@CommandArgument('--start-image',
--- a/testing/vcttesting/mozreview.py
+++ b/testing/vcttesting/mozreview.py
@@ -19,17 +19,17 @@ from vcttesting.bugzilla import Bugzilla
from vcttesting.docker import (
Docker,
DockerNotAvailable,
params_from_env,
)
from vcttesting.reviewboard import MozReviewBoard
from .ldap import LDAP
-from .util import get_available_port
+from .util import get_available_port, limited_threadpoolexecutor
HERE = os.path.abspath(os.path.dirname(__file__))
ROOT = os.path.normpath(os.path.join(HERE, '..', '..'))
SSH_CONFIG = '''
Host *
StrictHostKeyChecking no
@@ -145,23 +145,23 @@ class MozReview(object):
pulse_host=self.pulse_host,
pulse_port=self.pulse_port)
def get_ldap(self):
"""Obtain an LDAP instance connected to the LDAP server in this instance."""
return LDAP(self.ldap_uri, 'cn=admin,dc=mozilla', 'password')
def start(self, bugzilla_port=None, reviewboard_port=None,
- mercurial_port=None, pulse_port=None, verbose=False,
- web_image=None, hgrb_image=None,
- ldap_image=None, ldap_port=None, pulse_image=None,
- rbweb_image=None, ssh_port=None,
- hgweb_image=None, hgweb_port=None,
- autolanddb_image=None, autoland_image=None, autoland_port=None,
- treestatus_image=None, treestatus_port=None):
+ mercurial_port=None, pulse_port=None, verbose=False,
+ web_image=None, hgrb_image=None,
+ ldap_image=None, ldap_port=None, pulse_image=None,
+ rbweb_image=None, ssh_port=None,
+ hgweb_image=None, hgweb_port=None,
+ autolanddb_image=None, autoland_image=None, autoland_port=None,
+ treestatus_image=None, treestatus_port=None, max_workers=None):
"""Start a MozReview instance."""
if self.started:
raise Exception('MozReview instance has already been started')
if not bugzilla_port:
bugzilla_port = get_available_port()
if not reviewboard_port:
reviewboard_port = get_available_port()
@@ -210,16 +210,17 @@ class MozReview(object):
hg_port=mercurial_port,
autolanddb_image=autolanddb_image,
autoland_image=autoland_image,
autoland_port=autoland_port,
hgweb_image=hgweb_image,
hgweb_port=hgweb_port,
treestatus_image=treestatus_image,
treestatus_port=treestatus_port,
+ max_workers=max_workers,
verbose=verbose)
self.bmoweb_id = mr_info['web_id']
self.bugzilla_url = mr_info['bugzilla_url']
bugzilla = self.get_bugzilla()
self.reviewboard_url = mr_info['reviewboard_url']
@@ -259,17 +260,17 @@ class MozReview(object):
rb.login_user(bugzilla.username, bugzilla.password)
# Ensure the mozreview hg user is present and has privileges.
# This has to occur after the admin user is logged in to avoid
# race conditions with user IDs.
rb.create_local_user(self.hg_rb_username, self.hg_rb_email,
self.hg_rb_password)
- with futures.ThreadPoolExecutor(7) as e:
+ with limited_threadpoolexecutor(7, max_workers) as e:
# Ensure admin user had admin privileges.
e.submit(rb.make_admin, bugzilla.username)
# Ensure mozreview user has permissions for testing.
e.submit(rb.grant_permission, self.hg_rb_username,
'Can change ldap assocation for all users')
e.submit(rb.grant_permission, self.hg_rb_username,
--- a/testing/vcttesting/mozreview_mach_commands.py
+++ b/testing/vcttesting/mozreview_mach_commands.py
@@ -54,31 +54,34 @@ class MozReviewCommands(object):
@CommandArgument('--ldap-port', type=int,
help='Port LDAP server should listen on.')
@CommandArgument('--ssh-port', type=int,
help='Port Mercurial SSH server should listen on.')
@CommandArgument('--hgweb-port', type=int,
help='Port hg.mo HTTP server should listen on.')
@CommandArgument('--treestatus-port', type=int,
help='Port treestatus HTTP server should listen on.')
+ @CommandArgument('--forks', type=int,
+ help='Number of parallel build processes to use. (default=unlimited)')
def start(self, where, bugzilla_port=None, reviewboard_port=None,
mercurial_port=None, pulse_port=None, autoland_port=None,
ldap_port=None, ssh_port=None, hgweb_port=None,
- treestatus_port=None):
+ treestatus_port=None, forks=None):
mr = self._get_mozreview(where)
mr.start(bugzilla_port=bugzilla_port,
- reviewboard_port=reviewboard_port,
- mercurial_port=mercurial_port,
- pulse_port=pulse_port,
- autoland_port=autoland_port,
- ldap_port=ldap_port,
- ssh_port=ssh_port,
- hgweb_port=hgweb_port,
- treestatus_port=treestatus_port,
- verbose=True)
+ reviewboard_port=reviewboard_port,
+ mercurial_port=mercurial_port,
+ pulse_port=pulse_port,
+ autoland_port=autoland_port,
+ ldap_port=ldap_port,
+ ssh_port=ssh_port,
+ hgweb_port=hgweb_port,
+ treestatus_port=treestatus_port,
+ max_workers=forks,
+ verbose=True)
print('Bugzilla URL: %s' % mr.bugzilla_url)
print('Review Board URL: %s' % mr.reviewboard_url)
print('Mercurial RB URL: %s' % mr.mercurial_url)
print('hg.mo URL: %s' % mr.hgweb_url)
print('Pulse endpoint: %s:%s' % (mr.pulse_host, mr.pulse_port))
print('Autoland URL: %s' % mr.autoland_url)
print('Treestatus URL: %s' % mr.treestatus_url)
--- a/testing/vcttesting/util.py
+++ b/testing/vcttesting/util.py
@@ -3,16 +3,17 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, unicode_literals
import os
import socket
import time
+import concurrent.futures as futures
from kafka.client import KafkaClient
import kombu
import paramiko
import requests
HERE = os.path.abspath(os.path.dirname(__file__))
ROOT = os.path.normpath(os.path.join(HERE, '..', '..'))
@@ -120,8 +121,37 @@ def wait_for_kafka(hostport, timeout=60)
return
except Exception:
pass
if time.time() - start > timeout:
raise Exception('Timeout reached waiting for Kafka')
time.sleep(0.1)
+
+
+def limited_threadpoolexecutor(wanted_workers, max_workers=None):
+ """Return a ThreadPoolExecutor with up to ``max_workers`` executors.
+
+ Call with ``wanted_workers`` equal to None to ask for the default number
+ of workers, which is the number of processors on the machine multiplied
+ by 5.
+
+ Call with ``max_workers`` less than 1 or ``max_workers=None`` to specify
+ no limit on worker threads.
+
+ See https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
+ """
+ # Are we trying to ask for default workers, which is the "number of
+ # processors on the machine, multiplied by 5"?
+ # See https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
+ wants_unlimited = (wanted_workers is None) or (wanted_workers < 1)
+
+ max_unlimited = (max_workers is None) or (max_workers < 1)
+
+ if max_unlimited:
+ workers = wanted_workers
+ elif wants_unlimited:
+ workers = max_workers
+ else:
+ workers = min(wanted_workers, max_workers)
+
+ return futures.ThreadPoolExecutor(workers)
\ No newline at end of file