Bug 1369711 - [mozlint] Make sure KeyboardInterrupts are handled well wherever they happen draft
authorAndrew Halberstadt <ahalberstadt@mozilla.com>
Fri, 23 Feb 2018 08:55:06 -0500
changeset 768899 66dff277330ff24eefbf59feb88540d5dc7e415f
parent 767031 ca8f497d2eb9d5b1dd6459288cc963da569b5c51
push id103005
push userahalberstadt@mozilla.com
push dateSat, 17 Mar 2018 01:56:28 +0000
bugs1369711
milestone61.0a1
Bug 1369711 - [mozlint] Make sure KeyboardInterrupts are handled well wherever they happen There a few pieces needed here to properly handle KeyboardInterrupts. 1. All in-progress work needs to abort. Ideally the underlying linters will be able to catch KeyboardInterrupt, and return partial results (like the flake8 linter does). Linters may alternatively allow the KeyboardInterrupt to propagate up. Mozlint will catch and handle this appropriately, though any results found will be lost. The only change to this behaviour was fixing a bug in the flake8 linter. 2. Any unstarted jobs need to be canceled. In concurrent.futures, there are two different queues. First, jobs are placed on the work queue, which is just a list maintained by the parent process. As workers become available, jobs are moved off the work queue, and onto the call queue (which is a multiprocessing.Queue). Jobs that live on the work queue can be canceled with 'future.cancel()', whereas jobs that live on the call queue cannot. The number of extra jobs that are stored on the call queue is determined by this variable: https://hg.mozilla.org/mozilla-central/file/deb7714a7bcd/third_party/python/futures/concurrent/futures/process.py#l86 In this patch, the parent process' sigint handler (which will be called on Ctrl-C) is responsible for canceling all the jobs on the work queue. For the jobs on the call queue, the best we can do is set a global variable that tells workers to abort early. 3. Idle workers should exit gracefully. When there are no more jobs left, workers will block on the call queue (either waiting for more jobs, or waiting for the executor to send the shutdown signal). If a KeyboardInterrupt is received while a worker is blocking, it isn't possible to intercept that anywhere (due to quirks of how concurrent.futures is implemented). The InterruptableQueue class was created to solve this problem. It will return None instead of propagating KeyboardInterrupt. A None value will wake the worker up and tell it to gracefully shutdown. This way, we avoid cryptic tracebacks in the output. With all of these various pieces solved, pressing Ctrl-C appears to always exit gracefully, sometimes even printing partial results. MozReview-Commit-ID: 36Pe3bbUKmk
python/mozlint/mozlint/roller.py
python/mozlint/test/linters/external.py
python/mozlint/test/linters/slow.yml
python/mozlint/test/runcli.py
python/mozlint/test/test_roller.py
tools/lint/python/flake8.py
--- a/python/mozlint/mozlint/roller.py
+++ b/python/mozlint/mozlint/roller.py
@@ -7,48 +7,83 @@ from __future__ import absolute_import, 
 import os
 import signal
 import sys
 import traceback
 from collections import defaultdict
 from concurrent.futures import ProcessPoolExecutor
 from math import ceil
 from multiprocessing import cpu_count
+from multiprocessing.queues import Queue
 from subprocess import CalledProcessError
 
 from mozversioncontrol import get_repository_object, MissingUpstreamRepo, InvalidRepoPath
 
 from .errors import LintersNotConfigured
 from .parser import Parser
 from .pathutils import findobject
 from .types import supported_types
 
+SHUTDOWN = False
+orig_sigint = signal.getsignal(signal.SIGINT)
+
 
 def _run_worker(config, paths, **lintargs):
     results = defaultdict(list)
     failed = []
 
+    if SHUTDOWN:
+        return results, failed
+
     func = supported_types[config['type']]
     try:
         res = func(paths, config, **lintargs) or []
     except Exception:
         traceback.print_exc()
         res = 1
+    except (KeyboardInterrupt, SystemExit):
+        return results, failed
     finally:
         sys.stdout.flush()
 
     if not isinstance(res, (list, tuple)):
         if res:
             failed.append(config['name'])
     else:
         for r in res:
             results[r.path].append(r)
     return results, failed
 
 
+class InterruptableQueue(Queue):
+    """A multiprocessing.Queue that catches KeyboardInterrupt when a worker is
+    blocking on it and returns None.
+
+    This is needed to gracefully handle KeyboardInterrupts when a worker is
+    blocking on ProcessPoolExecutor's call queue.
+    """
+
+    def get(self, *args, **kwargs):
+        try:
+            return Queue.get(self, *args, **kwargs)
+        except KeyboardInterrupt:
+            return None
+
+
+def _worker_sigint_handler(signum, frame):
+    """Sigint handler for the worker subprocesses.
+
+    Tells workers not to process the extra jobs on the call queue that couldn't
+    be canceled by the parent process.
+    """
+    global SHUTDOWN
+    SHUTDOWN = True
+    orig_sigint(signum, frame)
+
+
 class LintRoller(object):
     """Registers and runs linters.
 
     :param root: Path to which relative paths will be joined. If
                  unspecified, root will either be determined from
                  version control or cwd.
     :param lintargs: Arguments to pass to the underlying linter(s).
     """
@@ -181,22 +216,38 @@ class LintRoller(object):
         paths = map(os.path.abspath, paths)
 
         num_procs = num_procs or cpu_count()
         jobs = list(self._generate_jobs(paths, num_procs))
 
         # Make sure we never spawn more processes than we have jobs.
         num_procs = min(len(jobs), num_procs)
 
+        signal.signal(signal.SIGINT, _worker_sigint_handler)
         executor = ProcessPoolExecutor(num_procs)
+        executor._call_queue = InterruptableQueue(executor._call_queue._maxsize)
 
         # Submit jobs to the worker pool. The _collect_results method will be
-        # called when a job is finished.
+        # called when a job is finished. We store the futures so that they can
+        # be canceled in the event of a KeyboardInterrupt.
+        futures = []
         for job in jobs:
             future = executor.submit(_run_worker, *job, **self.lintargs)
             future.add_done_callback(self._collect_results)
+            futures.append(future)
 
-        # Ignore SIGINT in parent so we can still get partial results
-        # from child processes. These should shutdown quickly anyway.
-        orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
-        executor.shutdown()  # blocks until all workers have finished
+        def _parent_sigint_handler(signum, frame):
+            """Sigint handler for the parent process.
+
+            Cancels all jobs that have not yet been placed on the call queue.
+            The parent process won't exit until all workers have terminated.
+            Assuming the linters are implemented properly, this shouldn't take
+            more than a couple seconds.
+            """
+            [f.cancel() for f in futures]
+            executor.shutdown(wait=False)
+            print("\nwarning: not all files were linted")
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+        signal.signal(signal.SIGINT, _parent_sigint_handler)
+        executor.shutdown()
         signal.signal(signal.SIGINT, orig_sigint)
         return self.results
--- a/python/mozlint/test/linters/external.py
+++ b/python/mozlint/test/linters/external.py
@@ -1,15 +1,16 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function
 
 import os
+import time
 
 from mozlint import result
 from mozlint.errors import LintException
 
 
 def badreturncode(files, config, **lintargs):
     return 1
 
@@ -31,16 +32,21 @@ def external(files, config, **lintargs):
                         config, path=path, lineno=i+1, column=1, rule="no-foobar"))
     return results
 
 
 def raises(files, config, **lintargs):
     raise LintException("Oh no something bad happened!")
 
 
+def slow(files, config, **lintargs):
+    time.sleep(2)
+    return []
+
+
 def structured(files, config, logger, **kwargs):
     for path in files:
         if os.path.isdir(path):
             continue
 
         with open(path, 'r') as fh:
             for i, line in enumerate(fh.readlines()):
                 if 'foobar' in line:
new file mode 100644
--- /dev/null
+++ b/python/mozlint/test/linters/slow.yml
@@ -0,0 +1,8 @@
+---
+SlowLinter:
+    description: A linter that takes awhile to run
+    include:
+        - files
+    type: external
+    extensions: ['.js', '.jsm']
+    payload: external:slow
new file mode 100644
--- /dev/null
+++ b/python/mozlint/test/runcli.py
@@ -0,0 +1,20 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import
+
+import os
+import sys
+
+here = os.path.abspath(os.path.dirname(__file__))
+sys.path.insert(0, os.path.join(os.path.dirname(here), 'mozlint'))
+
+from mozlint import cli
+cli.SEARCH_PATHS.append(os.path.join(here, 'linters'))
+
+if __name__ == '__main__':
+    parser = cli.MozlintParser()
+    args = vars(parser.parse_args(sys.argv[1:]))
+    args['root'] = here
+    sys.exit(cli.run(**args))
--- a/python/mozlint/test/test_roller.py
+++ b/python/mozlint/test/test_roller.py
@@ -1,17 +1,20 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import
 
 import os
 import platform
+import signal
+import subprocess
 import sys
+import time
 
 import mozunit
 import pytest
 
 from mozlint import ResultContainer
 from mozlint.errors import LintersNotConfigured
 
 
@@ -113,16 +116,33 @@ def test_max_paths_per_job(monkeypatch, 
     assert len(linters) == 3
 
     lint.MAX_PATHS_PER_JOB = max_paths
     lint.read(linters)
     num_jobs = len(lint.roll(files, num_procs=2)['count'])
     assert num_jobs == expected_jobs
 
 
+@pytest.mark.skipif(platform.system() == 'Windows',
+                    reason="signal.CTRL_C_EVENT isn't causing a KeyboardInterrupt on Windows")
+def test_keyboard_interrupt():
+    # We use two linters so we'll have two jobs. One (string.yml) will complete
+    # quickly. The other (slow.yml) will run slowly.  This way the first worker
+    # will be be stuck blocking on the ProcessPoolExecutor._call_queue when the
+    # signal arrives and the other still be doing work.
+    cmd = [sys.executable, 'runcli.py', '-l=string', '-l=slow']
+    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=here)
+    time.sleep(1)
+    proc.send_signal(signal.SIGINT)
+
+    out = proc.communicate()[0]
+    assert 'warning: not all files were linted' in out
+    assert 'Traceback' not in out
+
+
 linters = ('setup.yml', 'setupfailed.yml', 'setupraised.yml')
 
 
 def test_setup(lint, linters, filedir, capfd):
     with pytest.raises(LintersNotConfigured):
         lint.setup()
 
     lint.read(linters)
--- a/tools/lint/python/flake8.py
+++ b/tools/lint/python/flake8.py
@@ -126,16 +126,17 @@ def reinstall_flake8():
 
 def run_process(config, cmd):
     proc = Flake8Process(config, cmd)
     proc.run()
     try:
         proc.wait()
     except KeyboardInterrupt:
         proc.kill()
+        return 1
 
 
 def setup(root):
     if not reinstall_flake8():
         print(FLAKE8_INSTALL_ERROR)
         return 1
 
 
@@ -158,11 +159,12 @@ def lint(paths, config, **lintargs):
     for configs, paths in paths_by_config.items():
         cmd = cmdargs[:]
 
         if configs != 'default':
             configs = reversed(configs.split(os.pathsep))
             cmd.extend(['--append-config={}'.format(c) for c in configs])
 
         cmd.extend(paths)
-        run_process(config, cmd)
+        if run_process(config, cmd):
+            break
 
     return results