Bug 1369711 - [mozlint] Refactor concurrent.futures ProcessPoolExecutor code for readability draft
authorAndrew Halberstadt <ahalberstadt@mozilla.com>
Fri, 23 Feb 2018 09:02:16 -0500
changeset 767031 ca8f497d2eb9d5b1dd6459288cc963da569b5c51
parent 766824 8f1b2f872f0ea358a0412eb8b8687f08d47f6621
child 767032 f334d8f2335460393732d4457f3ebee6eafd33de
child 768899 66dff277330ff24eefbf59feb88540d5dc7e415f
push id102489
push userahalberstadt@mozilla.com
push dateTue, 13 Mar 2018 20:36:25 +0000
bugs1369711
milestone61.0a1
Bug 1369711 - [mozlint] Refactor concurrent.futures ProcessPoolExecutor code for readability This commit doesn't change any behaviour, just attempts to make this a little more readable. The workers will call '_collect_results' for each WorkItem they process (either because it is finished or because it was canceled). This also differentiates between setup failures and run failures. MozReview-Commit-ID: 36Pe3bbUKmk
python/mozlint/mozlint/cli.py
python/mozlint/mozlint/roller.py
python/mozlint/test/test_cli.py
python/mozlint/test/test_roller.py
--- a/python/mozlint/mozlint/cli.py
+++ b/python/mozlint/mozlint/cli.py
@@ -170,17 +170,17 @@ def run(paths, linters, fmt, outgoing, w
     if edit and results:
         edit_results(results)
         results = lint.roll(results.keys())
 
     formatter = formatters.get(fmt)
 
     # Encode output with 'replace' to avoid UnicodeEncodeErrors on
     # environments that aren't using utf-8.
-    out = formatter(results, failed=lint.failed).encode(
+    out = formatter(results, failed=lint.failed | lint.failed_setup).encode(
                     sys.stdout.encoding or 'ascii', 'replace')
     if out:
         print(out)
     return 1 if results or lint.failed else 0
 
 
 if __name__ == '__main__':
     parser = MozlintParser()
--- a/python/mozlint/mozlint/roller.py
+++ b/python/mozlint/mozlint/roller.py
@@ -17,44 +17,38 @@ from subprocess import CalledProcessErro
 from mozversioncontrol import get_repository_object, MissingUpstreamRepo, InvalidRepoPath
 
 from .errors import LintersNotConfigured
 from .parser import Parser
 from .pathutils import findobject
 from .types import supported_types
 
 
-def _run_linters(config, paths, **lintargs):
+def _run_worker(config, paths, **lintargs):
     results = defaultdict(list)
     failed = []
 
     func = supported_types[config['type']]
-    res = func(paths, config, **lintargs) or []
+    try:
+        res = func(paths, config, **lintargs) or []
+    except Exception:
+        traceback.print_exc()
+        res = 1
+    finally:
+        sys.stdout.flush()
 
     if not isinstance(res, (list, tuple)):
         if res:
             failed.append(config['name'])
     else:
         for r in res:
             results[r.path].append(r)
     return results, failed
 
 
-def _run_worker(*args, **kwargs):
-    try:
-        return _run_linters(*args, **kwargs)
-    except Exception:
-        # multiprocessing seems to munge worker exceptions, print
-        # it here so it isn't lost.
-        traceback.print_exc()
-        raise
-    finally:
-        sys.stdout.flush()
-
-
 class LintRoller(object):
     """Registers and runs linters.
 
     :param root: Path to which relative paths will be joined. If
                  unspecified, root will either be determined from
                  version control or cwd.
     :param lintargs: Arguments to pass to the underlying linter(s).
     """
@@ -66,18 +60,21 @@ class LintRoller(object):
             self.vcs = get_repository_object(root)
         except InvalidRepoPath:
             self.vcs = None
 
         self.linters = []
         self.lintargs = lintargs
         self.lintargs['root'] = root
 
-        # linters that return non-zero
-        self.failed = set()
+        # result state
+        self.failed = None
+        self.failed_setup = None
+        self.results = None
+
         self.root = root
 
     def read(self, paths):
         """Parse one or more linters and add them to the registry.
 
         :param paths: A path or iterable of paths to linter definitions.
         """
         if isinstance(paths, basestring):
@@ -86,58 +83,72 @@ class LintRoller(object):
         for path in paths:
             self.linters.extend(self.parse(path))
 
     def setup(self):
         """Run setup for applicable linters"""
         if not self.linters:
             raise LintersNotConfigured
 
-        failed = set()
+        self.failed_setup = set()
         for linter in self.linters:
             if 'setup' not in linter:
                 continue
 
             try:
                 res = findobject(linter['setup'])(self.root)
             except Exception:
                 traceback.print_exc()
                 res = 1
 
             if res:
-                failed.add(linter['name'])
+                self.failed_setup.add(linter['name'])
 
-        if failed:
-            print("error: problem with lint setup, skipping {}".format(', '.join(sorted(failed))))
-            self.linters = [l for l in self.linters if l['name'] not in failed]
-            self.failed.update(failed)
+        if self.failed_setup:
+            print("error: problem with lint setup, skipping {}".format(
+                    ', '.join(sorted(self.failed_setup))))
+            self.linters = [l for l in self.linters if l['name'] not in self.failed_setup]
             return 1
         return 0
 
     def _generate_jobs(self, paths, num_procs):
         """A job is of the form (<linter:dict>, <paths:list>)."""
         chunk_size = min(self.MAX_PATHS_PER_JOB, int(ceil(float(len(paths)) / num_procs)))
         while paths:
             for linter in self.linters:
                 yield linter, paths[:chunk_size]
             paths = paths[chunk_size:]
 
+    def _collect_results(self, future):
+        if future.cancelled():
+            return
+
+        results, failed = future.result()
+        if failed:
+            self.failed.update(set(failed))
+        for k, v in results.iteritems():
+            self.results[k].extend(v)
+
     def roll(self, paths=None, outgoing=None, workdir=None, num_procs=None):
         """Run all of the registered linters against the specified file paths.
 
         :param paths: An iterable of files and/or directories to lint.
         :param outgoing: Lint files touched by commits that are not on the remote repository.
         :param workdir: Lint all files touched in the working directory.
         :param num_procs: The number of processes to use. Default: cpu count
         :return: A dictionary with file names as the key, and a list of
                  :class:`~result.ResultContainer`s as the value.
         """
         if not self.linters:
             raise LintersNotConfigured
 
+        # reset result state
+        self.results = defaultdict(list)
+        self.failed = set()
+
         # Need to use a set in case vcs operations specify the same file
         # more than once.
         paths = paths or set()
         if isinstance(paths, basestring):
             paths = set([paths])
         elif isinstance(paths, (list, tuple)):
             paths = set(paths)
 
@@ -165,24 +176,27 @@ class LintRoller(object):
 
         paths = paths or ['.']
 
         # This will convert paths back to a list, but that's ok since
         # we're done adding to it.
         paths = map(os.path.abspath, paths)
 
         num_procs = num_procs or cpu_count()
-        all_results = defaultdict(list)
-        with ProcessPoolExecutor(num_procs) as executor:
-            futures = [executor.submit(_run_worker, config, p, **self.lintargs)
-                       for config, p in self._generate_jobs(paths, num_procs)]
-            # ignore SIGINT in parent so we can still get partial results
-            # from child processes. These should shutdown quickly anyway.
-            orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
-            for future in futures:
-                results, failed = future.result()
-                if failed:
-                    self.failed.update(set(failed))
-                for k, v in results.iteritems():
-                    all_results[k].extend(v)
+        jobs = list(self._generate_jobs(paths, num_procs))
+
+        # Make sure we never spawn more processes than we have jobs.
+        num_procs = min(len(jobs), num_procs)
+
+        executor = ProcessPoolExecutor(num_procs)
 
+        # Submit jobs to the worker pool. The _collect_results method will be
+        # called when a job is finished.
+        for job in jobs:
+            future = executor.submit(_run_worker, *job, **self.lintargs)
+            future.add_done_callback(self._collect_results)
+
+        # Ignore SIGINT in parent so we can still get partial results
+        # from child processes. These should shutdown quickly anyway.
+        orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
+        executor.shutdown()  # blocks until all workers have finished
         signal.signal(signal.SIGINT, orig_sigint)
-        return all_results
+        return self.results
--- a/python/mozlint/test/test_cli.py
+++ b/python/mozlint/test/test_cli.py
@@ -44,20 +44,21 @@ def test_cli_run_with_fix(run, capfd):
 @pytest.mark.skipif(not find_executable("echo"), reason="No `echo` executable found.")
 def test_cli_run_with_edit(run, parser, capfd):
     os.environ['EDITOR'] = 'echo'
 
     ret = run(['-f', 'compact', '--edit', '--linter', 'external'])
     out, err = capfd.readouterr()
     out = out.splitlines()
     assert ret == 1
-    assert len(out) == 5
     assert out[0].endswith('foobar.js')  # from the `echo` editor
     assert "foobar.js: line 1, col 1, Error" in out[1]
     assert "foobar.js: line 2, col 1, Error" in out[2]
+    assert "2 problems" in out[-1]
+    assert len(out) == 5
 
     del os.environ['EDITOR']
     with pytest.raises(SystemExit):
         parser.parse_args(['--edit'])
 
 
 def test_cli_run_with_setup(run, capfd):
     # implicitly call setup
--- a/python/mozlint/test/test_roller.py
+++ b/python/mozlint/test/test_roller.py
@@ -7,58 +7,57 @@ from __future__ import absolute_import
 import os
 import platform
 import sys
 
 import mozunit
 import pytest
 
 from mozlint import ResultContainer
-from mozlint.errors import LintersNotConfigured, LintException
+from mozlint.errors import LintersNotConfigured
 
 
 here = os.path.abspath(os.path.dirname(__file__))
 
 
 linters = ('string.yml', 'regex.yml', 'external.yml')
 
 
 def test_roll_no_linters_configured(lint, files):
     with pytest.raises(LintersNotConfigured):
         lint.roll(files)
 
 
 def test_roll_successful(lint, linters, files):
     lint.read(linters)
 
+    assert lint.results is None
     result = lint.roll(files)
     assert len(result) == 1
+    assert lint.results == result
     assert lint.failed == set([])
 
     path = result.keys()[0]
     assert os.path.basename(path) == 'foobar.js'
 
     errors = result[path]
     assert isinstance(errors, list)
     assert len(errors) == 6
 
     container = errors[0]
     assert isinstance(container, ResultContainer)
     assert container.rule == 'no-foobar'
 
 
-def test_roll_catch_exception(lint, lintdir, files):
+def test_roll_catch_exception(lint, lintdir, files, capfd):
     lint.read(os.path.join(lintdir, 'raises.yml'))
 
-    # suppress printed traceback from test output
-    old_stderr = sys.stderr
-    sys.stderr = open(os.devnull, 'w')
-    with pytest.raises(LintException):
-        lint.roll(files)
-    sys.stderr = old_stderr
+    lint.roll(files)  # assert not raises
+    out, err = capfd.readouterr()
+    assert 'LintException' in err
 
 
 def test_roll_with_excluded_path(lint, linters, files):
     lint.lintargs.update({'exclude': ['**/foobar.js']})
 
     lint.read(linters)
     result = lint.roll(files)
 
@@ -71,46 +70,46 @@ def test_roll_with_invalid_extension(lin
     result = lint.roll(os.path.join(filedir, 'foobar.py'))
     assert len(result) == 0
     assert lint.failed == set([])
 
 
 def test_roll_with_failure_code(lint, lintdir, files):
     lint.read(os.path.join(lintdir, 'badreturncode.yml'))
 
-    assert lint.failed == set([])
+    assert lint.failed is None
     result = lint.roll(files, num_procs=1)
     assert len(result) == 0
     assert lint.failed == set(['BadReturnCodeLinter'])
 
 
-def fake_run_linters(config, paths, **lintargs):
+def fake_run_worker(config, paths, **lintargs):
     return {'count': [1]}, []
 
 
 @pytest.mark.skipif(platform.system() == 'Windows',
                     reason="monkeypatch issues with multiprocessing on Windows")
 @pytest.mark.parametrize('num_procs', [1, 4, 8, 16])
 def test_number_of_jobs(monkeypatch, lint, linters, files, num_procs):
-    monkeypatch.setattr(sys.modules[lint.__module__], '_run_linters', fake_run_linters)
+    monkeypatch.setattr(sys.modules[lint.__module__], '_run_worker', fake_run_worker)
 
     lint.read(linters)
     num_jobs = len(lint.roll(files, num_procs=num_procs)['count'])
 
     if len(files) >= num_procs:
         assert num_jobs == num_procs * len(linters)
     else:
         assert num_jobs == len(files) * len(linters)
 
 
 @pytest.mark.skipif(platform.system() == 'Windows',
                     reason="monkeypatch issues with multiprocessing on Windows")
 @pytest.mark.parametrize('max_paths,expected_jobs', [(1, 12), (4, 6), (16, 6)])
 def test_max_paths_per_job(monkeypatch, lint, linters, files, max_paths, expected_jobs):
-    monkeypatch.setattr(sys.modules[lint.__module__], '_run_linters', fake_run_linters)
+    monkeypatch.setattr(sys.modules[lint.__module__], '_run_worker', fake_run_worker)
 
     files = files[:4]
     assert len(files) == 4
 
     linters = linters[:3]
     assert len(linters) == 3
 
     lint.MAX_PATHS_PER_JOB = max_paths
@@ -128,13 +127,13 @@ def test_setup(lint, linters, filedir, c
 
     lint.read(linters)
     lint.setup()
     out, err = capfd.readouterr()
     assert 'setup passed' in out
     assert 'setup failed' in out
     assert 'setup raised' in out
     assert 'error: problem with lint setup, skipping' in out
-    assert lint.failed == set(['SetupFailedLinter', 'SetupRaisedLinter'])
+    assert lint.failed_setup == set(['SetupFailedLinter', 'SetupRaisedLinter'])
 
 
 if __name__ == '__main__':
     mozunit.main()