Bug 1369711 - [mozlint] Refactor concurrent.futures ProcessPoolExecutor code for readability
This commit doesn't change any behaviour, just attempts to make this a little
more readable. The workers will call '_collect_results' for each WorkItem they
process (either because it is finished or because it was canceled).
This also differentiates between setup failures and run failures.
MozReview-Commit-ID: 36Pe3bbUKmk
--- a/python/mozlint/mozlint/cli.py
+++ b/python/mozlint/mozlint/cli.py
@@ -170,17 +170,17 @@ def run(paths, linters, fmt, outgoing, w
if edit and results:
edit_results(results)
results = lint.roll(results.keys())
formatter = formatters.get(fmt)
# Encode output with 'replace' to avoid UnicodeEncodeErrors on
# environments that aren't using utf-8.
- out = formatter(results, failed=lint.failed).encode(
+ out = formatter(results, failed=lint.failed | lint.failed_setup).encode(
sys.stdout.encoding or 'ascii', 'replace')
if out:
print(out)
return 1 if results or lint.failed else 0
if __name__ == '__main__':
parser = MozlintParser()
--- a/python/mozlint/mozlint/roller.py
+++ b/python/mozlint/mozlint/roller.py
@@ -17,44 +17,38 @@ from subprocess import CalledProcessErro
from mozversioncontrol import get_repository_object, MissingUpstreamRepo, InvalidRepoPath
from .errors import LintersNotConfigured
from .parser import Parser
from .pathutils import findobject
from .types import supported_types
-def _run_linters(config, paths, **lintargs):
+def _run_worker(config, paths, **lintargs):
results = defaultdict(list)
failed = []
func = supported_types[config['type']]
- res = func(paths, config, **lintargs) or []
+ try:
+ res = func(paths, config, **lintargs) or []
+ except Exception:
+ traceback.print_exc()
+ res = 1
+ finally:
+ sys.stdout.flush()
if not isinstance(res, (list, tuple)):
if res:
failed.append(config['name'])
else:
for r in res:
results[r.path].append(r)
return results, failed
-def _run_worker(*args, **kwargs):
- try:
- return _run_linters(*args, **kwargs)
- except Exception:
- # multiprocessing seems to munge worker exceptions, print
- # it here so it isn't lost.
- traceback.print_exc()
- raise
- finally:
- sys.stdout.flush()
-
-
class LintRoller(object):
"""Registers and runs linters.
:param root: Path to which relative paths will be joined. If
unspecified, root will either be determined from
version control or cwd.
:param lintargs: Arguments to pass to the underlying linter(s).
"""
@@ -66,18 +60,21 @@ class LintRoller(object):
self.vcs = get_repository_object(root)
except InvalidRepoPath:
self.vcs = None
self.linters = []
self.lintargs = lintargs
self.lintargs['root'] = root
- # linters that return non-zero
- self.failed = set()
+ # result state
+ self.failed = None
+ self.failed_setup = None
+ self.results = None
+
self.root = root
def read(self, paths):
"""Parse one or more linters and add them to the registry.
:param paths: A path or iterable of paths to linter definitions.
"""
if isinstance(paths, basestring):
@@ -86,58 +83,72 @@ class LintRoller(object):
for path in paths:
self.linters.extend(self.parse(path))
def setup(self):
"""Run setup for applicable linters"""
if not self.linters:
raise LintersNotConfigured
- failed = set()
+ self.failed_setup = set()
for linter in self.linters:
if 'setup' not in linter:
continue
try:
res = findobject(linter['setup'])(self.root)
except Exception:
traceback.print_exc()
res = 1
if res:
- failed.add(linter['name'])
+ self.failed_setup.add(linter['name'])
- if failed:
- print("error: problem with lint setup, skipping {}".format(', '.join(sorted(failed))))
- self.linters = [l for l in self.linters if l['name'] not in failed]
- self.failed.update(failed)
+ if self.failed_setup:
+ print("error: problem with lint setup, skipping {}".format(
+ ', '.join(sorted(self.failed_setup))))
+ self.linters = [l for l in self.linters if l['name'] not in self.failed_setup]
return 1
return 0
def _generate_jobs(self, paths, num_procs):
"""A job is of the form (<linter:dict>, <paths:list>)."""
chunk_size = min(self.MAX_PATHS_PER_JOB, int(ceil(float(len(paths)) / num_procs)))
while paths:
for linter in self.linters:
yield linter, paths[:chunk_size]
paths = paths[chunk_size:]
+ def _collect_results(self, future):
+ if future.cancelled():
+ return
+
+ results, failed = future.result()
+ if failed:
+ self.failed.update(set(failed))
+ for k, v in results.iteritems():
+ self.results[k].extend(v)
+
def roll(self, paths=None, outgoing=None, workdir=None, num_procs=None):
"""Run all of the registered linters against the specified file paths.
:param paths: An iterable of files and/or directories to lint.
:param outgoing: Lint files touched by commits that are not on the remote repository.
:param workdir: Lint all files touched in the working directory.
:param num_procs: The number of processes to use. Default: cpu count
:return: A dictionary with file names as the key, and a list of
:class:`~result.ResultContainer`s as the value.
"""
if not self.linters:
raise LintersNotConfigured
+ # reset result state
+ self.results = defaultdict(list)
+ self.failed = set()
+
# Need to use a set in case vcs operations specify the same file
# more than once.
paths = paths or set()
if isinstance(paths, basestring):
paths = set([paths])
elif isinstance(paths, (list, tuple)):
paths = set(paths)
@@ -165,24 +176,27 @@ class LintRoller(object):
paths = paths or ['.']
# This will convert paths back to a list, but that's ok since
# we're done adding to it.
paths = map(os.path.abspath, paths)
num_procs = num_procs or cpu_count()
- all_results = defaultdict(list)
- with ProcessPoolExecutor(num_procs) as executor:
- futures = [executor.submit(_run_worker, config, p, **self.lintargs)
- for config, p in self._generate_jobs(paths, num_procs)]
- # ignore SIGINT in parent so we can still get partial results
- # from child processes. These should shutdown quickly anyway.
- orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
- for future in futures:
- results, failed = future.result()
- if failed:
- self.failed.update(set(failed))
- for k, v in results.iteritems():
- all_results[k].extend(v)
+ jobs = list(self._generate_jobs(paths, num_procs))
+
+ # Make sure we never spawn more processes than we have jobs.
+ num_procs = min(len(jobs), num_procs)
+
+ executor = ProcessPoolExecutor(num_procs)
+ # Submit jobs to the worker pool. The _collect_results method will be
+ # called when a job is finished.
+ for job in jobs:
+ future = executor.submit(_run_worker, *job, **self.lintargs)
+ future.add_done_callback(self._collect_results)
+
+ # Ignore SIGINT in parent so we can still get partial results
+ # from child processes. These should shutdown quickly anyway.
+ orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
+ executor.shutdown() # blocks until all workers have finished
signal.signal(signal.SIGINT, orig_sigint)
- return all_results
+ return self.results
--- a/python/mozlint/test/test_cli.py
+++ b/python/mozlint/test/test_cli.py
@@ -44,20 +44,21 @@ def test_cli_run_with_fix(run, capfd):
@pytest.mark.skipif(not find_executable("echo"), reason="No `echo` executable found.")
def test_cli_run_with_edit(run, parser, capfd):
os.environ['EDITOR'] = 'echo'
ret = run(['-f', 'compact', '--edit', '--linter', 'external'])
out, err = capfd.readouterr()
out = out.splitlines()
assert ret == 1
- assert len(out) == 5
assert out[0].endswith('foobar.js') # from the `echo` editor
assert "foobar.js: line 1, col 1, Error" in out[1]
assert "foobar.js: line 2, col 1, Error" in out[2]
+ assert "2 problems" in out[-1]
+ assert len(out) == 5
del os.environ['EDITOR']
with pytest.raises(SystemExit):
parser.parse_args(['--edit'])
def test_cli_run_with_setup(run, capfd):
# implicitly call setup
--- a/python/mozlint/test/test_roller.py
+++ b/python/mozlint/test/test_roller.py
@@ -7,58 +7,57 @@ from __future__ import absolute_import
import os
import platform
import sys
import mozunit
import pytest
from mozlint import ResultContainer
-from mozlint.errors import LintersNotConfigured, LintException
+from mozlint.errors import LintersNotConfigured
here = os.path.abspath(os.path.dirname(__file__))
linters = ('string.yml', 'regex.yml', 'external.yml')
def test_roll_no_linters_configured(lint, files):
with pytest.raises(LintersNotConfigured):
lint.roll(files)
def test_roll_successful(lint, linters, files):
lint.read(linters)
+ assert lint.results is None
result = lint.roll(files)
assert len(result) == 1
+ assert lint.results == result
assert lint.failed == set([])
path = result.keys()[0]
assert os.path.basename(path) == 'foobar.js'
errors = result[path]
assert isinstance(errors, list)
assert len(errors) == 6
container = errors[0]
assert isinstance(container, ResultContainer)
assert container.rule == 'no-foobar'
-def test_roll_catch_exception(lint, lintdir, files):
+def test_roll_catch_exception(lint, lintdir, files, capfd):
lint.read(os.path.join(lintdir, 'raises.yml'))
- # suppress printed traceback from test output
- old_stderr = sys.stderr
- sys.stderr = open(os.devnull, 'w')
- with pytest.raises(LintException):
- lint.roll(files)
- sys.stderr = old_stderr
+ lint.roll(files) # assert not raises
+ out, err = capfd.readouterr()
+ assert 'LintException' in err
def test_roll_with_excluded_path(lint, linters, files):
lint.lintargs.update({'exclude': ['**/foobar.js']})
lint.read(linters)
result = lint.roll(files)
@@ -71,46 +70,46 @@ def test_roll_with_invalid_extension(lin
result = lint.roll(os.path.join(filedir, 'foobar.py'))
assert len(result) == 0
assert lint.failed == set([])
def test_roll_with_failure_code(lint, lintdir, files):
lint.read(os.path.join(lintdir, 'badreturncode.yml'))
- assert lint.failed == set([])
+ assert lint.failed is None
result = lint.roll(files, num_procs=1)
assert len(result) == 0
assert lint.failed == set(['BadReturnCodeLinter'])
-def fake_run_linters(config, paths, **lintargs):
+def fake_run_worker(config, paths, **lintargs):
return {'count': [1]}, []
@pytest.mark.skipif(platform.system() == 'Windows',
reason="monkeypatch issues with multiprocessing on Windows")
@pytest.mark.parametrize('num_procs', [1, 4, 8, 16])
def test_number_of_jobs(monkeypatch, lint, linters, files, num_procs):
- monkeypatch.setattr(sys.modules[lint.__module__], '_run_linters', fake_run_linters)
+ monkeypatch.setattr(sys.modules[lint.__module__], '_run_worker', fake_run_worker)
lint.read(linters)
num_jobs = len(lint.roll(files, num_procs=num_procs)['count'])
if len(files) >= num_procs:
assert num_jobs == num_procs * len(linters)
else:
assert num_jobs == len(files) * len(linters)
@pytest.mark.skipif(platform.system() == 'Windows',
reason="monkeypatch issues with multiprocessing on Windows")
@pytest.mark.parametrize('max_paths,expected_jobs', [(1, 12), (4, 6), (16, 6)])
def test_max_paths_per_job(monkeypatch, lint, linters, files, max_paths, expected_jobs):
- monkeypatch.setattr(sys.modules[lint.__module__], '_run_linters', fake_run_linters)
+ monkeypatch.setattr(sys.modules[lint.__module__], '_run_worker', fake_run_worker)
files = files[:4]
assert len(files) == 4
linters = linters[:3]
assert len(linters) == 3
lint.MAX_PATHS_PER_JOB = max_paths
@@ -128,13 +127,13 @@ def test_setup(lint, linters, filedir, c
lint.read(linters)
lint.setup()
out, err = capfd.readouterr()
assert 'setup passed' in out
assert 'setup failed' in out
assert 'setup raised' in out
assert 'error: problem with lint setup, skipping' in out
- assert lint.failed == set(['SetupFailedLinter', 'SetupRaisedLinter'])
+ assert lint.failed_setup == set(['SetupFailedLinter', 'SetupRaisedLinter'])
if __name__ == '__main__':
mozunit.main()