Bug 1400716 - Explicitly drain all queues on exit, r=maja_zf
The fact that the queues were not drained on exist seemed to cause an
intermittent failure where one process tried to write to a queue that
another had closed. Draining the queues explicitly should avoid this
and ensure we surface hidden problems in the log.
MozReview-Commit-ID: 8ulLNpIcj5z
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/executormarionette.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/executormarionette.py
@@ -170,17 +170,21 @@ class MarionetteProtocol(Protocol):
def wait(self):
try:
socket_timeout = self.marionette.client.socket_timeout
except AttributeError:
# This can happen if there was a crash
return
if socket_timeout:
- self.marionette.timeout.script = socket_timeout / 2
+ try:
+ self.marionette.timeout.script = socket_timeout / 2
+ except (socket.error, IOError):
+ self.logger.debug("Socket closed")
+ return
self.marionette.switch_to_window(self.runner_handle)
while True:
try:
self.marionette.execute_async_script("")
except errors.NoSuchWindowException:
# The window closed
break
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
@@ -115,28 +115,35 @@ class TestRunner(object):
self.result_queue.put((command, args))
def start_runner(runner_command_queue, runner_result_queue,
executor_cls, executor_kwargs,
executor_browser_cls, executor_browser_kwargs,
stop_flag):
"""Launch a TestRunner in a new process"""
+ def log(level, msg):
+ runner_result_queue.put(("log", (level, {"message": msg})))
+
+ def handle_error(e):
+ log("critical", traceback.format_exc())
+ stop_flag.set()
+
try:
browser = executor_browser_cls(**executor_browser_kwargs)
executor = executor_cls(browser, **executor_kwargs)
with TestRunner(runner_command_queue, runner_result_queue, executor) as runner:
try:
runner.run()
except KeyboardInterrupt:
stop_flag.set()
- except Exception:
- runner_result_queue.put(("log", ("critical", {"message": traceback.format_exc()})))
- print >> sys.stderr, traceback.format_exc()
- stop_flag.set()
+ except Exception as e:
+ handle_error(e)
+ except Exception as e:
+ handle_error(e)
finally:
runner_command_queue = None
runner_result_queue = None
manager_count = 0
@@ -384,16 +391,17 @@ class TestRunnerManager(threading.Thread
None: {
"runner_teardown": self.runner_teardown,
"log": self.log,
"error": self.error
}
}
try:
command, data = self.command_queue.get(True, 1)
+ self.logger.debug("Got command: %r" % command)
except IOError:
self.logger.error("Got IOError from poll")
return RunnerManagerState.restarting(0)
except Empty:
if (self.debug_info and self.debug_info.interactive and
self.browser.started and not self.browser.is_alive()):
self.logger.debug("Debugger exited")
return RunnerManagerState.stop()
@@ -671,17 +679,28 @@ class TestRunnerManager(threading.Thread
self.remote_queue.put((command, args))
def cleanup(self):
self.logger.debug("TestManager cleanup")
if self.browser:
self.browser.cleanup()
while True:
try:
- self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait())))
+ cmd, data = self.command_queue.get_nowait()
+ except Empty:
+ break
+ else:
+ if cmd == "log":
+ self.log(*data)
+ else:
+ self.logger.warning("%r: %r" % (cmd, data))
+ while True:
+ try:
+ cmd, data = self.remote_queue.get_nowait()
+ self.logger.warning("%r: %r" % (cmd, data))
except Empty:
break
def make_test_queue(tests, test_source_cls, **test_source_kwargs):
queue = test_source_cls.make_queue(tests, **test_source_kwargs)
# There is a race condition that means sometimes we continue
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptlogging.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptlogging.py
@@ -1,11 +1,12 @@
import logging
import sys
import threading
+from Queue import Empty
from StringIO import StringIO
from multiprocessing import Queue
from mozlog import commandline, stdadapter, set_default_logger
from mozlog.structuredlog import StructuredLogger
def setup(args, defaults):
logger = args.pop('log', None)
@@ -46,17 +47,16 @@ class LogLevelRewriter(object):
def __call__(self, data):
if data["action"] == "log" and data["level"].upper() in self.from_levels:
data = data.copy()
data["level"] = self.to_level
return self.inner(data)
-
class LogThread(threading.Thread):
def __init__(self, queue, logger, level):
self.queue = queue
self.log_func = getattr(logger, level)
threading.Thread.__init__(self, name="Thread-Log")
self.daemon = True
def run(self):
@@ -121,10 +121,15 @@ class CaptureIO(object):
def __exit__(self, *args, **kwargs):
if self.do_capture:
sys.stdout, sys.stderr = self.original_stdio
if self.logging_queue is not None:
self.logger.info("Closing logging queue")
self.logging_queue.put(None)
if self.logging_thread is not None:
self.logging_thread.join(10)
+ while not self.logging_queue.empty():
+ try:
+ self.logger.warning("Dropping log message: %r", self.logging_queue.get())
+ except Exception:
+ pass
self.logging_queue.close()
self.logger.info("queue closed")