Bug 1363428 - Switch wptrunner to use a deque for test groups, r=ato draft
authorJames Graham <james@hoppipolla.co.uk>
Sun, 28 May 2017 21:14:28 +0100
changeset 600214 2d525263bb91e466bc49cb57f14e2efe2d7d59d7
parent 600213 d7be32e0ac3f4c076605e28a2d13b763b328735c
child 600215 c0b1c9e10c01ae450f0b8b43881e7a6503a4c5ec
push id65688
push userbmo:james@hoppipolla.co.uk
push dateSat, 24 Jun 2017 11:04:46 +0000
reviewersato
bugs1363428
milestone56.0a1
Bug 1363428 - Switch wptrunner to use a deque for test groups, r=ato Initially wptrunner had a single test queue that was shared between all processes. Then for --run-by-dir it changed to a queue of queues. This change makes it a queue of deques, which is simpler, since the test queues themselves are no longer shared between processes. It also changes the implementation when we aren't using --run-by-dir but are using multiple processes to pre-group the tests into N queues rather than sharing a single queue between all processes. This is necessary to use the deque of course, but importantly anticipates a change in which we will pre-compute per queue metdata for each queue; that doesn't work well with one shared queue. The downside of this change is that there is no work stealing, so it may be less efficient if we randomly assign many slow jobs to one particular process. MozReview-Commit-ID: 7e0Odk7yDwr
testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
@@ -559,79 +559,77 @@ class TestLoader(object):
                 groups.add(group)
 
         return groups
 
 
 class TestSource(object):
     __metaclass__ = ABCMeta
 
-    @abstractmethod
-    def queue_tests(self, test_queue):
-        pass
+    def __init__(self, test_queue):
+        self.test_queue = test_queue
+        self.current_group = None
 
     @abstractmethod
-    def requeue_test(self, test):
+    #@classmethod (doesn't compose with @abstractmethod)
+    def make_queue(cls, tests, **kwargs):
         pass
 
-    def __enter__(self):
-        return self
+    def group(self):
+        if not self.current_group or len(self.current_group) == 0:
+            try:
+                self.current_group = self.test_queue.get(block=False)
+            except Empty:
+                return None
+        return self.current_group
+
+
+class GroupedSource(TestSource):
+    @classmethod
+    def new_group(cls, state, test, **kwargs):
+        raise NotImplementedError
 
-    def __exit__(self, *args, **kwargs):
-        pass
+    @classmethod
+    def make_queue(cls, tests, **kwargs):
+        test_queue = Queue()
+        groups = []
+
+        state = {}
+
+        for test in tests:
+            if cls.new_group(state, test, **kwargs):
+                groups.append(deque([]))
+
+            group = groups[-1]
+            group.append(test)
+
+        for item in groups:
+            test_queue.put(item)
+        return test_queue
 
 
 class SingleTestSource(TestSource):
-    def __init__(self, test_queue):
-        self.test_queue = test_queue
-
     @classmethod
-    def queue_tests(cls, test_queue, test_type, tests):
-        for test in tests[test_type]:
-            test_queue.put(test)
+    def make_queue(cls, tests, **kwargs):
+        test_queue = Queue()
+        processes = kwargs["processes"]
+        queues = [deque([]) for _ in xrange(processes)]
+        for test in tests:
+            idx = hash(test.id) % processes
+            group = queues[idx]
+            group.append(test)
 
-    def get_queue(self):
-        if self.test_queue.empty():
-            return None
-        return self.test_queue
+        for item in queues:
+            test_queue.put(item)
 
-    def requeue_test(self, test):
-        self.test_queue.put(test)
+        return test_queue
 
-class PathGroupedSource(TestSource):
-    def __init__(self, test_queue):
-        self.test_queue = test_queue
-        self.current_queue = None
 
+class PathGroupedSource(GroupedSource):
     @classmethod
-    def queue_tests(cls, test_queue, test_type, tests, depth=None):
+    def new_group(cls, state, test, **kwargs):
+        depth = kwargs.get("depth")
         if depth is True:
             depth = None
-
-        prev_path = None
-        group = None
-
-        for test in tests[test_type]:
-            path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
-            if path != prev_path:
-                group = []
-                test_queue.put(group)
-                prev_path = path
-
-            group.append(test)
-
-    def get_queue(self):
-        if not self.current_queue or self.current_queue.empty():
-            try:
-                data = self.test_queue.get(block=True, timeout=1)
-                self.current_queue = Queue()
-                for item in data:
-                    self.current_queue.put(item)
-            except Empty:
-                return None
-        return self.current_queue
-
-    def requeue_test(self, test):
-        self.current_queue.put(test)
-
-    def __exit__(self, *args, **kwargs):
-        if self.current_queue:
-            self.current_queue.close()
+        path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
+        rv = path != state.get("prev_path")
+        state["prev_path"] = path
+        return rv
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
@@ -142,18 +142,16 @@ manager_count = 0
 
 def next_manager_number():
     global manager_count
     local = manager_count = manager_count + 1
     return local
 
 
 class BrowserManager(object):
-    init_lock = threading.Lock()
-
     def __init__(self, logger, browser, command_queue, no_timeout=False):
         self.logger = logger
         self.browser = browser
         self.no_timeout = no_timeout
         self.browser_settings = None
         self.last_test = None
 
         self.started = False
@@ -175,37 +173,34 @@ class BrowserManager(object):
         # It seems that this lock is helpful to prevent some race that otherwise
         # sometimes stops the spawned processes initalising correctly, and
         # leaves this thread hung
         if self.init_timer is not None:
             self.init_timer.cancel()
 
         self.logger.debug("Init called, starting browser and runner")
 
-        with self.init_lock:
-            # Guard against problems initialising the browser or the browser
-            # remote control method
-            if not self.no_timeout:
-                self.init_timer = threading.Timer(self.browser.init_timeout,
-                                                  self.init_timeout)
-            try:
-                if self.init_timer is not None:
-                    self.init_timer.start()
-                self.logger.debug("Starting browser with settings %r" % self.browser_settings)
-                self.browser.start(**self.browser_settings)
-                self.browser_pid = self.browser.pid()
-            except:
-                self.logger.warning("Failure during init %s" % traceback.format_exc())
-                if self.init_timer is not None:
-                    self.init_timer.cancel()
-                self.logger.error(traceback.format_exc())
-                succeeded = False
-            else:
-                succeeded = True
-                self.started = True
+        if not self.no_timeout:
+            self.init_timer = threading.Timer(self.browser.init_timeout,
+                                              self.init_timeout)
+        try:
+            if self.init_timer is not None:
+                self.init_timer.start()
+            self.logger.debug("Starting browser with settings %r" % self.browser_settings)
+            self.browser.start(**self.browser_settings)
+            self.browser_pid = self.browser.pid()
+        except:
+            self.logger.warning("Failure during init %s" % traceback.format_exc())
+            if self.init_timer is not None:
+                self.init_timer.cancel()
+            self.logger.error(traceback.format_exc())
+            succeeded = False
+        else:
+            succeeded = True
+            self.started = True
 
         return succeeded
 
     def send_message(self, command, *args):
         self.command_queue.put((command, args))
 
     def init_timeout(self):
         # This is called from a seperate thread, so we send a message to the
@@ -232,31 +227,29 @@ class BrowserManager(object):
         self.browser.log_crash(process=self.browser_pid, test=test_id)
 
     def is_alive(self):
         return self.browser.is_alive()
 
 
 class _RunnerManagerState(object):
     before_init = namedtuple("before_init", [])
-    initalizing = namedtuple("initalizing_browser",
-                             ["test", "test_queue", "failure_count"])
-    running = namedtuple("running", ["test", "test_queue"])
-    restarting = namedtuple("restarting", ["test", "test_queue"])
+    initializing = namedtuple("initializing_browser",
+                              ["test", "test_group", "failure_count"])
+    running = namedtuple("running", ["test", "test_group"])
+    restarting = namedtuple("restarting", ["test", "test_group"])
     error = namedtuple("error", [])
     stop = namedtuple("stop", [])
 
 
 RunnerManagerState = _RunnerManagerState()
 
 
 class TestRunnerManager(threading.Thread):
-    init_lock = threading.Lock()
-
-    def __init__(self, suite_name, tests, test_source_cls, browser_cls, browser_kwargs,
+    def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
                  executor_cls, executor_kwargs, stop_flag, pause_after_test=False,
                  pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None):
         """Thread that owns a single TestRunner process and any processes required
         by the TestRunner (e.g. the Firefox binary).
 
         TestRunnerManagers are responsible for launching the browser process and the
         runner process, and for logging the test progress. The actual test running
         is done by the TestRunner. In particular they:
@@ -266,28 +259,24 @@ class TestRunnerManager(threading.Thread
         * Tell the TestRunner to start a test, if any
         * Log that the test started
         * Log the test results
         * Take any remedial action required e.g. restart crashed or hung
           processes
         """
         self.suite_name = suite_name
 
-        self.tests = tests
-        self.test_source_cls = test_source_cls
-        self.test_queue = None
+        self.test_source = test_source_cls(test_queue)
 
         self.browser_cls = browser_cls
         self.browser_kwargs = browser_kwargs
 
         self.executor_cls = executor_cls
         self.executor_kwargs = executor_kwargs
 
-        self.test_source = None
-
         # Flags used to shut down this thread if we get a sigint
         self.parent_stop_flag = stop_flag
         self.child_stop_flag = multiprocessing.Event()
 
         self.pause_after_test = pause_after_test
         self.pause_on_unexpected = pause_on_unexpected
         self.restart_on_unexpected = restart_on_unexpected
         self.debug_info = debug_info
@@ -316,25 +305,24 @@ class TestRunnerManager(threading.Thread
         """Main loop for the TestManager.
 
         TestManagers generally receive commands from their
         TestRunner updating them on the status of a test. They
         may also have a stop flag set by the main thread indicating
         that the manager should shut down the next time the event loop
         spins."""
         self.logger = structuredlog.StructuredLogger(self.suite_name)
-        with self.browser_cls(self.logger, **self.browser_kwargs) as browser, self.test_source_cls(self.tests) as test_source:
+        with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
             self.browser = BrowserManager(self.logger,
                                           browser,
                                           self.command_queue,
                                           no_timeout=self.debug_info is not None)
-            self.test_source = test_source
             dispatch = {
                 RunnerManagerState.before_init: self.start_init,
-                RunnerManagerState.initalizing: self.init,
+                RunnerManagerState.initializing: self.init,
                 RunnerManagerState.running: self.run_test,
                 RunnerManagerState.restarting: self.restart_runner
             }
 
             self.state = RunnerManagerState.before_init()
             end_states = (RunnerManagerState.stop,
                           RunnerManagerState.error)
 
@@ -369,17 +357,17 @@ class TestRunnerManager(threading.Thread
                 clean = isinstance(self.state, RunnerManagerState.stop)
                 self.stop_runner(force=not clean)
                 self.teardown()
         self.logger.debug("TestRunnerManager main loop terminated")
 
     def wait_event(self):
         dispatch = {
             RunnerManagerState.before_init: {},
-            RunnerManagerState.initalizing:
+            RunnerManagerState.initializing:
             {
                 "init_succeeded": self.init_succeeded,
                 "init_failed": self.init_failed,
             },
             RunnerManagerState.running:
             {
                 "test_ended": self.test_ended,
                 "wait_finished": self.wait_finished,
@@ -427,50 +415,49 @@ class TestRunnerManager(threading.Thread
             f = (dispatch.get(self.state.__class__, {}).get(command) or
                  dispatch.get(None, {}).get(command))
             if not f:
                 self.logger.warning("Got command %s in state %s" %
                                     (command, self.state.__class__.__name__))
                 return
             return f(*data)
 
-
     def should_stop(self):
         return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
 
     def start_init(self):
-        test, test_queue = self.get_next_test()
+        test, test_group = self.get_next_test()
         if test is None:
             return RunnerManagerState.stop()
         else:
-            return RunnerManagerState.initalizing(test, test_queue, 0)
+            return RunnerManagerState.initializing(test, test_group, 0)
 
     def init(self):
-        assert isinstance(self.state, RunnerManagerState.initalizing)
+        assert isinstance(self.state, RunnerManagerState.initializing)
         if self.state.failure_count > self.max_restarts:
             self.logger.error("Max restarts exceeded")
             return RunnerManagerState.error()
 
         self.browser.update_settings(self.state.test)
 
         result = self.browser.init()
         if result is Stop:
             return RunnerManagerState.error()
         elif not result:
-            return RunnerManagerState.initalizing(self.state.test,
-                                                  self.state.test_queue,
-                                                  self.state.failure_count + 1)
+            return RunnerManagerState.initializing(self.state.test,
+                                                   self.state.test_group,
+                                                   self.state.failure_count + 1)
         else:
             self.start_test_runner()
 
     def start_test_runner(self):
         # Note that we need to be careful to start the browser before the
         # test runner to ensure that any state set when the browser is started
         # can be passed in to the test runner.
-        assert isinstance(self.state, RunnerManagerState.initalizing)
+        assert isinstance(self.state, RunnerManagerState.initializing)
         assert self.command_queue is not None
         assert self.remote_queue is not None
         self.logger.info("Starting runner")
         executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
 
         args = (self.remote_queue,
                 self.command_queue,
                 self.executor_cls,
@@ -481,53 +468,48 @@ class TestRunnerManager(threading.Thread
         self.test_runner_proc = Process(target=start_runner,
                                         args=args,
                                         name="Thread-TestRunner-%i" % self.manager_number)
         self.test_runner_proc.start()
         self.logger.debug("Test runner started")
         # Now we wait for either an init_succeeded event or an init_failed event
 
     def init_succeeded(self):
-        assert isinstance(self.state, RunnerManagerState.initalizing)
+        assert isinstance(self.state, RunnerManagerState.initializing)
         self.browser.after_init()
         return RunnerManagerState.running(self.state.test,
-                                          self.state.test_queue)
+                                          self.state.test_group)
 
     def init_failed(self):
-        assert isinstance(self.state, RunnerManagerState.initalizing)
+        assert isinstance(self.state, RunnerManagerState.initializing)
         self.browser.after_init()
         self.stop_runner(force=True)
-        return RunnerManagerState.initalizing(self.state.test,
-                                              self.state.test_queue,
-                                              self.state.failure_count + 1)
+        return RunnerManagerState.initializing(self.state.test,
+                                               self.state.test_group,
+                                               self.state.failure_count + 1)
 
-    def get_next_test(self, test_queue=None):
+    def get_next_test(self, test_group=None):
         test = None
         while test is None:
-            if test_queue is None:
-                test_queue = self.test_source.get_queue()
-                if test_queue is None:
+            while test_group is None or len(test_group) == 0:
+                test_group = self.test_source.group()
+                if test_group is None:
                     self.logger.info("No more tests")
                     return None, None
-            try:
-                # Need to block here just to allow for contention with other processes
-                test = test_queue.get(block=True, timeout=2)
-            except Empty:
-                if test_queue.empty():
-                    test_queue = None
-        return test, test_queue
+            test = test_group.popleft()
+        return test, test_group
 
     def run_test(self):
         assert isinstance(self.state, RunnerManagerState.running)
         assert self.state.test is not None
 
         if self.browser.update_settings(self.state.test):
             self.logger.info("Restarting browser for new test environment")
             return RunnerManagerState.restarting(self.state.test,
-                                                 self.state.test_queue)
+                                                 self.state.test_group)
 
         self.logger.test_start(self.state.test.id)
         self.send_message("run_test", self.state.test)
 
     def test_ended(self, test, results):
         """Handle the end of a test.
 
         Output the result of each subtest, and the result of the overall
@@ -590,32 +572,32 @@ class TestRunnerManager(threading.Thread
         # The browser should be stopped already, but this ensures we do any post-stop
         # processing
         self.logger.debug("Wait finished")
 
         return self.after_test_end(True)
 
     def after_test_end(self, restart):
         assert isinstance(self.state, RunnerManagerState.running)
-        test, test_queue = self.get_next_test()
+        test, test_group = self.get_next_test()
         if test is None:
             return RunnerManagerState.stop()
-        if test_queue != self.state.test_queue:
+        if test_group != self.state.test_group:
             # We are starting a new group of tests, so force a restart
             restart = True
         if restart:
-            return RunnerManagerState.restarting(test, test_queue)
+            return RunnerManagerState.restarting(test, test_group)
         else:
-            return RunnerManagerState.running(test, test_queue)
+            return RunnerManagerState.running(test, test_group)
 
     def restart_runner(self):
         """Stop and restart the TestRunner"""
         assert isinstance(self.state, RunnerManagerState.restarting)
         self.stop_runner()
-        return RunnerManagerState.initalizing(self.state.test, self.state.test_queue, 0)
+        return RunnerManagerState.initializing(self.state.test, self.state.test_group, 0)
 
     def log(self, action, kwargs):
         getattr(self.logger, action)(**kwargs)
 
     def error(self, message):
         self.logger.error(message)
         self.restart_runner()
 
@@ -668,44 +650,26 @@ class TestRunnerManager(threading.Thread
         if self.browser:
             self.browser.cleanup()
         while True:
             try:
                 self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait())))
             except Empty:
                 break
 
-class TestQueue(object):
-    def __init__(self, test_source_cls, test_type, tests, **kwargs):
-        self.queue = None
-        self.test_source_cls = test_source_cls
-        self.test_type = test_type
-        self.tests = tests
-        self.kwargs = kwargs
 
-    def __enter__(self):
-        if not self.tests[self.test_type]:
-            return None
+def make_test_queue(tests, test_source_cls, **test_source_kwargs):
+    queue = test_source_cls.make_queue(tests, **test_source_kwargs)
 
-        self.queue = Queue()
-        has_tests = self.test_source_cls.queue_tests(self.queue,
-                                                     self.test_type,
-                                                     self.tests,
-                                                     **self.kwargs)
-        # There is a race condition that means sometimes we continue
-        # before the tests have been written to the underlying pipe.
-        # Polling the pipe for data here avoids that
-        self.queue._reader.poll(10)
-        assert not self.queue.empty()
-        return self.queue
-
-    def __exit__(self, *args, **kwargs):
-        if self.queue is not None:
-            self.queue.close()
-            self.queue = None
+    # There is a race condition that means sometimes we continue
+    # before the tests have been written to the underlying pipe.
+    # Polling the pipe for data here avoids that
+    queue._reader.poll(10)
+    assert not queue.empty()
+    return queue
 
 
 class ManagerGroup(object):
     def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
                  browser_cls, browser_kwargs,
                  executor_cls, executor_kwargs,
                  pause_after_test=False,
                  pause_on_unexpected=False,
@@ -725,52 +689,50 @@ class ManagerGroup(object):
         self.restart_on_unexpected = restart_on_unexpected
         self.debug_info = debug_info
 
         self.pool = set()
         # Event that is polled by threads so that they can gracefully exit in the face
         # of sigint
         self.stop_flag = threading.Event()
         self.logger = structuredlog.StructuredLogger(suite_name)
-        self.test_queue = None
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.stop()
 
     def run(self, test_type, tests):
         """Start all managers in the group"""
         self.logger.debug("Using %i processes" % self.size)
 
-        self.test_queue = TestQueue(self.test_source_cls,
-                                    test_type,
-                                    tests,
-                                    **self.test_source_kwargs)
-        with self.test_queue as test_queue:
-            if test_queue is None:
-                self.logger.info("No %s tests to run" % test_type)
-                return
-            for _ in range(self.size):
-                manager = TestRunnerManager(self.suite_name,
-                                            test_queue,
-                                            self.test_source_cls,
-                                            self.browser_cls,
-                                            self.browser_kwargs,
-                                            self.executor_cls,
-                                            self.executor_kwargs,
-                                            self.stop_flag,
-                                            self.pause_after_test,
-                                            self.pause_on_unexpected,
-                                            self.restart_on_unexpected,
-                                            self.debug_info)
-                manager.start()
-                self.pool.add(manager)
-            self.wait()
+        type_tests = tests[test_type]
+        if type_tests is None:
+            self.logger.info("No %s tests to run" % test_type)
+            return
+
+        test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
+
+        for _ in range(self.size):
+            manager = TestRunnerManager(self.suite_name,
+                                        test_queue,
+                                        self.test_source_cls,
+                                        self.browser_cls,
+                                        self.browser_kwargs,
+                                        self.executor_cls,
+                                        self.executor_kwargs,
+                                        self.stop_flag,
+                                        self.pause_after_test,
+                                        self.pause_on_unexpected,
+                                        self.restart_on_unexpected,
+                                        self.debug_info)
+            manager.start()
+            self.pool.add(manager)
+        self.wait()
 
     def is_alive(self):
         """Boolean indicating whether any manager in the group is still alive"""
         return any(manager.is_alive() for manager in self.pool)
 
     def wait(self):
         """Wait for all the managers in the group to finish"""
         for item in self.pool:
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
@@ -147,23 +147,23 @@ def run_tests(config, test_paths, produc
             test_loader = kwargs["test_loader"]
         else:
             run_info, test_loader = get_loader(test_paths,
                                                product,
                                                ssl_env,
                                                run_info_extras=run_info_extras(**kwargs),
                                                **kwargs)
 
+        test_source_kwargs = {"processes": kwargs["processes"]}
         if kwargs["run_by_dir"] is False:
             test_source_cls = testloader.SingleTestSource
-            test_source_kwargs = {}
         else:
             # A value of None indicates infinite depth
             test_source_cls = testloader.PathGroupedSource
-            test_source_kwargs = {"depth": kwargs["run_by_dir"]}
+            test_source_kwargs["depth"] = kwargs["run_by_dir"]
 
         logger.info("Using %i client processes" % kwargs["processes"])
 
         unexpected_total = 0
 
         kwargs["pause_after_test"] = get_pause_after_test(test_loader, **kwargs)
 
         with env.TestEnvironment(test_paths,