Bug 1400716 - Explicitly drain all queues on exit, r=maja_zf draft
authorJames Graham <james@hoppipolla.co.uk>
Mon, 27 Nov 2017 16:16:55 +0000
changeset 713568 8cb2410d20b1e61f6c78ab5f0bece4d8e1c9f2e8
parent 713529 2ff08db67b917fba1558986f3f2f796260f970f8
child 713569 2941d29b2c9f011edcffa27b7011520bfc148637
push id93678
push userbmo:james@hoppipolla.co.uk
push dateWed, 20 Dec 2017 16:19:52 +0000
reviewersmaja_zf
bugs1400716
milestone59.0a1
Bug 1400716 - Explicitly drain all queues on exit, r=maja_zf The fact that the queues were not drained on exist seemed to cause an intermittent failure where one process tried to write to a queue that another had closed. Draining the queues explicitly should avoid this and ensure we surface hidden problems in the log. MozReview-Commit-ID: 8ulLNpIcj5z
testing/web-platform/tests/tools/wptrunner/wptrunner/executors/executormarionette.py
testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
testing/web-platform/tests/tools/wptrunner/wptrunner/wptlogging.py
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/executormarionette.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/executormarionette.py
@@ -170,17 +170,21 @@ class MarionetteProtocol(Protocol):
 
     def wait(self):
         try:
             socket_timeout = self.marionette.client.socket_timeout
         except AttributeError:
             # This can happen if there was a crash
             return
         if socket_timeout:
-            self.marionette.timeout.script = socket_timeout / 2
+            try:
+                self.marionette.timeout.script = socket_timeout / 2
+            except (socket.error, IOError):
+                self.logger.debug("Socket closed")
+                return
 
         self.marionette.switch_to_window(self.runner_handle)
         while True:
             try:
                 self.marionette.execute_async_script("")
             except errors.NoSuchWindowException:
                 # The window closed
                 break
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
@@ -115,28 +115,35 @@ class TestRunner(object):
         self.result_queue.put((command, args))
 
 
 def start_runner(runner_command_queue, runner_result_queue,
                  executor_cls, executor_kwargs,
                  executor_browser_cls, executor_browser_kwargs,
                  stop_flag):
     """Launch a TestRunner in a new process"""
+    def log(level, msg):
+        runner_result_queue.put(("log", (level, {"message": msg})))
+
+    def handle_error(e):
+        log("critical", traceback.format_exc())
+        stop_flag.set()
+
     try:
         browser = executor_browser_cls(**executor_browser_kwargs)
         executor = executor_cls(browser, **executor_kwargs)
         with TestRunner(runner_command_queue, runner_result_queue, executor) as runner:
             try:
                 runner.run()
             except KeyboardInterrupt:
                 stop_flag.set()
-    except Exception:
-        runner_result_queue.put(("log", ("critical", {"message": traceback.format_exc()})))
-        print >> sys.stderr, traceback.format_exc()
-        stop_flag.set()
+            except Exception as e:
+                handle_error(e)
+    except Exception as e:
+        handle_error(e)
     finally:
         runner_command_queue = None
         runner_result_queue = None
 
 
 manager_count = 0
 
 
@@ -384,16 +391,17 @@ class TestRunnerManager(threading.Thread
             None: {
                 "runner_teardown": self.runner_teardown,
                 "log": self.log,
                 "error": self.error
             }
         }
         try:
             command, data = self.command_queue.get(True, 1)
+            self.logger.debug("Got command: %r" % command)
         except IOError:
             self.logger.error("Got IOError from poll")
             return RunnerManagerState.restarting(0)
         except Empty:
             if (self.debug_info and self.debug_info.interactive and
                 self.browser.started and not self.browser.is_alive()):
                 self.logger.debug("Debugger exited")
                 return RunnerManagerState.stop()
@@ -671,17 +679,28 @@ class TestRunnerManager(threading.Thread
         self.remote_queue.put((command, args))
 
     def cleanup(self):
         self.logger.debug("TestManager cleanup")
         if self.browser:
             self.browser.cleanup()
         while True:
             try:
-                self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait())))
+                cmd, data = self.command_queue.get_nowait()
+            except Empty:
+                break
+            else:
+                if cmd == "log":
+                     self.log(*data)
+                else:
+                    self.logger.warning("%r: %r" % (cmd, data))
+        while True:
+            try:
+                cmd, data = self.remote_queue.get_nowait()
+                self.logger.warning("%r: %r" % (cmd, data))
             except Empty:
                 break
 
 
 def make_test_queue(tests, test_source_cls, **test_source_kwargs):
     queue = test_source_cls.make_queue(tests, **test_source_kwargs)
 
     # There is a race condition that means sometimes we continue
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptlogging.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptlogging.py
@@ -1,11 +1,12 @@
 import logging
 import sys
 import threading
+from Queue import Empty
 from StringIO import StringIO
 from multiprocessing import Queue
 
 from mozlog import commandline, stdadapter, set_default_logger
 from mozlog.structuredlog import StructuredLogger
 
 def setup(args, defaults):
     logger = args.pop('log', None)
@@ -46,17 +47,16 @@ class LogLevelRewriter(object):
 
     def __call__(self, data):
         if data["action"] == "log" and data["level"].upper() in self.from_levels:
             data = data.copy()
             data["level"] = self.to_level
         return self.inner(data)
 
 
-
 class LogThread(threading.Thread):
     def __init__(self, queue, logger, level):
         self.queue = queue
         self.log_func = getattr(logger, level)
         threading.Thread.__init__(self, name="Thread-Log")
         self.daemon = True
 
     def run(self):
@@ -121,10 +121,15 @@ class CaptureIO(object):
     def __exit__(self, *args, **kwargs):
         if self.do_capture:
             sys.stdout, sys.stderr = self.original_stdio
             if self.logging_queue is not None:
                 self.logger.info("Closing logging queue")
                 self.logging_queue.put(None)
                 if self.logging_thread is not None:
                     self.logging_thread.join(10)
+                while not self.logging_queue.empty():
+                    try:
+                        self.logger.warning("Dropping log message: %r", self.logging_queue.get())
+                    except Exception:
+                        pass
                 self.logging_queue.close()
                 self.logger.info("queue closed")