--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
@@ -559,79 +559,77 @@ class TestLoader(object):
groups.add(group)
return groups
class TestSource(object):
__metaclass__ = ABCMeta
- @abstractmethod
- def queue_tests(self, test_queue):
- pass
+ def __init__(self, test_queue):
+ self.test_queue = test_queue
+ self.current_group = None
@abstractmethod
- def requeue_test(self, test):
+ #@classmethod (doesn't compose with @abstractmethod)
+ def make_queue(cls, tests, **kwargs):
pass
- def __enter__(self):
- return self
+ def group(self):
+ if not self.current_group or len(self.current_group) == 0:
+ try:
+ self.current_group = self.test_queue.get(block=False)
+ except Empty:
+ return None
+ return self.current_group
+
+
+class GroupedSource(TestSource):
+ @classmethod
+ def new_group(cls, state, test, **kwargs):
+ raise NotImplementedError
- def __exit__(self, *args, **kwargs):
- pass
+ @classmethod
+ def make_queue(cls, tests, **kwargs):
+ test_queue = Queue()
+ groups = []
+
+ state = {}
+
+ for test in tests:
+ if cls.new_group(state, test, **kwargs):
+ groups.append(deque([]))
+
+ group = groups[-1]
+ group.append(test)
+
+ for item in groups:
+ test_queue.put(item)
+ return test_queue
class SingleTestSource(TestSource):
- def __init__(self, test_queue):
- self.test_queue = test_queue
-
@classmethod
- def queue_tests(cls, test_queue, test_type, tests):
- for test in tests[test_type]:
- test_queue.put(test)
+ def make_queue(cls, tests, **kwargs):
+ test_queue = Queue()
+ processes = kwargs["processes"]
+ queues = [deque([]) for _ in xrange(processes)]
+ for test in tests:
+ idx = hash(test.id) % processes
+ group = queues[idx]
+ group.append(test)
- def get_queue(self):
- if self.test_queue.empty():
- return None
- return self.test_queue
+ for item in queues:
+ test_queue.put(item)
- def requeue_test(self, test):
- self.test_queue.put(test)
+ return test_queue
-class PathGroupedSource(TestSource):
- def __init__(self, test_queue):
- self.test_queue = test_queue
- self.current_queue = None
+class PathGroupedSource(GroupedSource):
@classmethod
- def queue_tests(cls, test_queue, test_type, tests, depth=None):
+ def new_group(cls, state, test, **kwargs):
+ depth = kwargs.get("depth")
if depth is True:
depth = None
-
- prev_path = None
- group = None
-
- for test in tests[test_type]:
- path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
- if path != prev_path:
- group = []
- test_queue.put(group)
- prev_path = path
-
- group.append(test)
-
- def get_queue(self):
- if not self.current_queue or self.current_queue.empty():
- try:
- data = self.test_queue.get(block=True, timeout=1)
- self.current_queue = Queue()
- for item in data:
- self.current_queue.put(item)
- except Empty:
- return None
- return self.current_queue
-
- def requeue_test(self, test):
- self.current_queue.put(test)
-
- def __exit__(self, *args, **kwargs):
- if self.current_queue:
- self.current_queue.close()
+ path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
+ rv = path != state.get("prev_path")
+ state["prev_path"] = path
+ return rv
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
@@ -142,18 +142,16 @@ manager_count = 0
def next_manager_number():
global manager_count
local = manager_count = manager_count + 1
return local
class BrowserManager(object):
- init_lock = threading.Lock()
-
def __init__(self, logger, browser, command_queue, no_timeout=False):
self.logger = logger
self.browser = browser
self.no_timeout = no_timeout
self.browser_settings = None
self.last_test = None
self.started = False
@@ -175,37 +173,34 @@ class BrowserManager(object):
# It seems that this lock is helpful to prevent some race that otherwise
# sometimes stops the spawned processes initalising correctly, and
# leaves this thread hung
if self.init_timer is not None:
self.init_timer.cancel()
self.logger.debug("Init called, starting browser and runner")
- with self.init_lock:
- # Guard against problems initialising the browser or the browser
- # remote control method
- if not self.no_timeout:
- self.init_timer = threading.Timer(self.browser.init_timeout,
- self.init_timeout)
- try:
- if self.init_timer is not None:
- self.init_timer.start()
- self.logger.debug("Starting browser with settings %r" % self.browser_settings)
- self.browser.start(**self.browser_settings)
- self.browser_pid = self.browser.pid()
- except:
- self.logger.warning("Failure during init %s" % traceback.format_exc())
- if self.init_timer is not None:
- self.init_timer.cancel()
- self.logger.error(traceback.format_exc())
- succeeded = False
- else:
- succeeded = True
- self.started = True
+ if not self.no_timeout:
+ self.init_timer = threading.Timer(self.browser.init_timeout,
+ self.init_timeout)
+ try:
+ if self.init_timer is not None:
+ self.init_timer.start()
+ self.logger.debug("Starting browser with settings %r" % self.browser_settings)
+ self.browser.start(**self.browser_settings)
+ self.browser_pid = self.browser.pid()
+ except:
+ self.logger.warning("Failure during init %s" % traceback.format_exc())
+ if self.init_timer is not None:
+ self.init_timer.cancel()
+ self.logger.error(traceback.format_exc())
+ succeeded = False
+ else:
+ succeeded = True
+ self.started = True
return succeeded
def send_message(self, command, *args):
self.command_queue.put((command, args))
def init_timeout(self):
# This is called from a seperate thread, so we send a message to the
@@ -232,31 +227,29 @@ class BrowserManager(object):
self.browser.log_crash(process=self.browser_pid, test=test_id)
def is_alive(self):
return self.browser.is_alive()
class _RunnerManagerState(object):
before_init = namedtuple("before_init", [])
- initalizing = namedtuple("initalizing_browser",
- ["test", "test_queue", "failure_count"])
- running = namedtuple("running", ["test", "test_queue"])
- restarting = namedtuple("restarting", ["test", "test_queue"])
+ initializing = namedtuple("initializing_browser",
+ ["test", "test_group", "failure_count"])
+ running = namedtuple("running", ["test", "test_group"])
+ restarting = namedtuple("restarting", ["test", "test_group"])
error = namedtuple("error", [])
stop = namedtuple("stop", [])
RunnerManagerState = _RunnerManagerState()
class TestRunnerManager(threading.Thread):
- init_lock = threading.Lock()
-
- def __init__(self, suite_name, tests, test_source_cls, browser_cls, browser_kwargs,
+ def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
executor_cls, executor_kwargs, stop_flag, pause_after_test=False,
pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None):
"""Thread that owns a single TestRunner process and any processes required
by the TestRunner (e.g. the Firefox binary).
TestRunnerManagers are responsible for launching the browser process and the
runner process, and for logging the test progress. The actual test running
is done by the TestRunner. In particular they:
@@ -266,28 +259,24 @@ class TestRunnerManager(threading.Thread
* Tell the TestRunner to start a test, if any
* Log that the test started
* Log the test results
* Take any remedial action required e.g. restart crashed or hung
processes
"""
self.suite_name = suite_name
- self.tests = tests
- self.test_source_cls = test_source_cls
- self.test_queue = None
+ self.test_source = test_source_cls(test_queue)
self.browser_cls = browser_cls
self.browser_kwargs = browser_kwargs
self.executor_cls = executor_cls
self.executor_kwargs = executor_kwargs
- self.test_source = None
-
# Flags used to shut down this thread if we get a sigint
self.parent_stop_flag = stop_flag
self.child_stop_flag = multiprocessing.Event()
self.pause_after_test = pause_after_test
self.pause_on_unexpected = pause_on_unexpected
self.restart_on_unexpected = restart_on_unexpected
self.debug_info = debug_info
@@ -316,25 +305,24 @@ class TestRunnerManager(threading.Thread
"""Main loop for the TestManager.
TestManagers generally receive commands from their
TestRunner updating them on the status of a test. They
may also have a stop flag set by the main thread indicating
that the manager should shut down the next time the event loop
spins."""
self.logger = structuredlog.StructuredLogger(self.suite_name)
- with self.browser_cls(self.logger, **self.browser_kwargs) as browser, self.test_source_cls(self.tests) as test_source:
+ with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
self.browser = BrowserManager(self.logger,
browser,
self.command_queue,
no_timeout=self.debug_info is not None)
- self.test_source = test_source
dispatch = {
RunnerManagerState.before_init: self.start_init,
- RunnerManagerState.initalizing: self.init,
+ RunnerManagerState.initializing: self.init,
RunnerManagerState.running: self.run_test,
RunnerManagerState.restarting: self.restart_runner
}
self.state = RunnerManagerState.before_init()
end_states = (RunnerManagerState.stop,
RunnerManagerState.error)
@@ -369,17 +357,17 @@ class TestRunnerManager(threading.Thread
clean = isinstance(self.state, RunnerManagerState.stop)
self.stop_runner(force=not clean)
self.teardown()
self.logger.debug("TestRunnerManager main loop terminated")
def wait_event(self):
dispatch = {
RunnerManagerState.before_init: {},
- RunnerManagerState.initalizing:
+ RunnerManagerState.initializing:
{
"init_succeeded": self.init_succeeded,
"init_failed": self.init_failed,
},
RunnerManagerState.running:
{
"test_ended": self.test_ended,
"wait_finished": self.wait_finished,
@@ -427,50 +415,49 @@ class TestRunnerManager(threading.Thread
f = (dispatch.get(self.state.__class__, {}).get(command) or
dispatch.get(None, {}).get(command))
if not f:
self.logger.warning("Got command %s in state %s" %
(command, self.state.__class__.__name__))
return
return f(*data)
-
def should_stop(self):
return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
def start_init(self):
- test, test_queue = self.get_next_test()
+ test, test_group = self.get_next_test()
if test is None:
return RunnerManagerState.stop()
else:
- return RunnerManagerState.initalizing(test, test_queue, 0)
+ return RunnerManagerState.initializing(test, test_group, 0)
def init(self):
- assert isinstance(self.state, RunnerManagerState.initalizing)
+ assert isinstance(self.state, RunnerManagerState.initializing)
if self.state.failure_count > self.max_restarts:
self.logger.error("Max restarts exceeded")
return RunnerManagerState.error()
self.browser.update_settings(self.state.test)
result = self.browser.init()
if result is Stop:
return RunnerManagerState.error()
elif not result:
- return RunnerManagerState.initalizing(self.state.test,
- self.state.test_queue,
- self.state.failure_count + 1)
+ return RunnerManagerState.initializing(self.state.test,
+ self.state.test_group,
+ self.state.failure_count + 1)
else:
self.start_test_runner()
def start_test_runner(self):
# Note that we need to be careful to start the browser before the
# test runner to ensure that any state set when the browser is started
# can be passed in to the test runner.
- assert isinstance(self.state, RunnerManagerState.initalizing)
+ assert isinstance(self.state, RunnerManagerState.initializing)
assert self.command_queue is not None
assert self.remote_queue is not None
self.logger.info("Starting runner")
executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
args = (self.remote_queue,
self.command_queue,
self.executor_cls,
@@ -481,53 +468,48 @@ class TestRunnerManager(threading.Thread
self.test_runner_proc = Process(target=start_runner,
args=args,
name="Thread-TestRunner-%i" % self.manager_number)
self.test_runner_proc.start()
self.logger.debug("Test runner started")
# Now we wait for either an init_succeeded event or an init_failed event
def init_succeeded(self):
- assert isinstance(self.state, RunnerManagerState.initalizing)
+ assert isinstance(self.state, RunnerManagerState.initializing)
self.browser.after_init()
return RunnerManagerState.running(self.state.test,
- self.state.test_queue)
+ self.state.test_group)
def init_failed(self):
- assert isinstance(self.state, RunnerManagerState.initalizing)
+ assert isinstance(self.state, RunnerManagerState.initializing)
self.browser.after_init()
self.stop_runner(force=True)
- return RunnerManagerState.initalizing(self.state.test,
- self.state.test_queue,
- self.state.failure_count + 1)
+ return RunnerManagerState.initializing(self.state.test,
+ self.state.test_group,
+ self.state.failure_count + 1)
- def get_next_test(self, test_queue=None):
+ def get_next_test(self, test_group=None):
test = None
while test is None:
- if test_queue is None:
- test_queue = self.test_source.get_queue()
- if test_queue is None:
+ while test_group is None or len(test_group) == 0:
+ test_group = self.test_source.group()
+ if test_group is None:
self.logger.info("No more tests")
return None, None
- try:
- # Need to block here just to allow for contention with other processes
- test = test_queue.get(block=True, timeout=2)
- except Empty:
- if test_queue.empty():
- test_queue = None
- return test, test_queue
+ test = test_group.popleft()
+ return test, test_group
def run_test(self):
assert isinstance(self.state, RunnerManagerState.running)
assert self.state.test is not None
if self.browser.update_settings(self.state.test):
self.logger.info("Restarting browser for new test environment")
return RunnerManagerState.restarting(self.state.test,
- self.state.test_queue)
+ self.state.test_group)
self.logger.test_start(self.state.test.id)
self.send_message("run_test", self.state.test)
def test_ended(self, test, results):
"""Handle the end of a test.
Output the result of each subtest, and the result of the overall
@@ -590,32 +572,32 @@ class TestRunnerManager(threading.Thread
# The browser should be stopped already, but this ensures we do any post-stop
# processing
self.logger.debug("Wait finished")
return self.after_test_end(True)
def after_test_end(self, restart):
assert isinstance(self.state, RunnerManagerState.running)
- test, test_queue = self.get_next_test()
+ test, test_group = self.get_next_test()
if test is None:
return RunnerManagerState.stop()
- if test_queue != self.state.test_queue:
+ if test_group != self.state.test_group:
# We are starting a new group of tests, so force a restart
restart = True
if restart:
- return RunnerManagerState.restarting(test, test_queue)
+ return RunnerManagerState.restarting(test, test_group)
else:
- return RunnerManagerState.running(test, test_queue)
+ return RunnerManagerState.running(test, test_group)
def restart_runner(self):
"""Stop and restart the TestRunner"""
assert isinstance(self.state, RunnerManagerState.restarting)
self.stop_runner()
- return RunnerManagerState.initalizing(self.state.test, self.state.test_queue, 0)
+ return RunnerManagerState.initializing(self.state.test, self.state.test_group, 0)
def log(self, action, kwargs):
getattr(self.logger, action)(**kwargs)
def error(self, message):
self.logger.error(message)
self.restart_runner()
@@ -668,44 +650,26 @@ class TestRunnerManager(threading.Thread
if self.browser:
self.browser.cleanup()
while True:
try:
self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait())))
except Empty:
break
-class TestQueue(object):
- def __init__(self, test_source_cls, test_type, tests, **kwargs):
- self.queue = None
- self.test_source_cls = test_source_cls
- self.test_type = test_type
- self.tests = tests
- self.kwargs = kwargs
- def __enter__(self):
- if not self.tests[self.test_type]:
- return None
+def make_test_queue(tests, test_source_cls, **test_source_kwargs):
+ queue = test_source_cls.make_queue(tests, **test_source_kwargs)
- self.queue = Queue()
- has_tests = self.test_source_cls.queue_tests(self.queue,
- self.test_type,
- self.tests,
- **self.kwargs)
- # There is a race condition that means sometimes we continue
- # before the tests have been written to the underlying pipe.
- # Polling the pipe for data here avoids that
- self.queue._reader.poll(10)
- assert not self.queue.empty()
- return self.queue
-
- def __exit__(self, *args, **kwargs):
- if self.queue is not None:
- self.queue.close()
- self.queue = None
+ # There is a race condition that means sometimes we continue
+ # before the tests have been written to the underlying pipe.
+ # Polling the pipe for data here avoids that
+ queue._reader.poll(10)
+ assert not queue.empty()
+ return queue
class ManagerGroup(object):
def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
browser_cls, browser_kwargs,
executor_cls, executor_kwargs,
pause_after_test=False,
pause_on_unexpected=False,
@@ -725,52 +689,50 @@ class ManagerGroup(object):
self.restart_on_unexpected = restart_on_unexpected
self.debug_info = debug_info
self.pool = set()
# Event that is polled by threads so that they can gracefully exit in the face
# of sigint
self.stop_flag = threading.Event()
self.logger = structuredlog.StructuredLogger(suite_name)
- self.test_queue = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def run(self, test_type, tests):
"""Start all managers in the group"""
self.logger.debug("Using %i processes" % self.size)
- self.test_queue = TestQueue(self.test_source_cls,
- test_type,
- tests,
- **self.test_source_kwargs)
- with self.test_queue as test_queue:
- if test_queue is None:
- self.logger.info("No %s tests to run" % test_type)
- return
- for _ in range(self.size):
- manager = TestRunnerManager(self.suite_name,
- test_queue,
- self.test_source_cls,
- self.browser_cls,
- self.browser_kwargs,
- self.executor_cls,
- self.executor_kwargs,
- self.stop_flag,
- self.pause_after_test,
- self.pause_on_unexpected,
- self.restart_on_unexpected,
- self.debug_info)
- manager.start()
- self.pool.add(manager)
- self.wait()
+ type_tests = tests[test_type]
+ if type_tests is None:
+ self.logger.info("No %s tests to run" % test_type)
+ return
+
+ test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
+
+ for _ in range(self.size):
+ manager = TestRunnerManager(self.suite_name,
+ test_queue,
+ self.test_source_cls,
+ self.browser_cls,
+ self.browser_kwargs,
+ self.executor_cls,
+ self.executor_kwargs,
+ self.stop_flag,
+ self.pause_after_test,
+ self.pause_on_unexpected,
+ self.restart_on_unexpected,
+ self.debug_info)
+ manager.start()
+ self.pool.add(manager)
+ self.wait()
def is_alive(self):
"""Boolean indicating whether any manager in the group is still alive"""
return any(manager.is_alive() for manager in self.pool)
def wait(self):
"""Wait for all the managers in the group to finish"""
for item in self.pool: