--- a/testing/mochitest/runtests.py
+++ b/testing/mochitest/runtests.py
@@ -512,30 +512,269 @@ class WebSocketServer(object):
self._process.run()
pid = self._process.pid
self._log.info("runtests.py | Websocket server pid: %d" % pid)
def stop(self):
self._process.kill()
-class MochitestBase(object):
+class SSLTunnel:
+
+ def __init__(self, options, logger, ignoreSSLTunnelExts=False):
+ self.log = logger
+ self.process = None
+ self.utilityPath = options.utilityPath
+ self.xrePath = options.xrePath
+ self.certPath = options.certPath
+ self.sslPort = options.sslPort
+ self.httpPort = options.httpPort
+ self.webServer = options.webServer
+ self.webSocketPort = options.webSocketPort
+ self.useSSLTunnelExts = not ignoreSSLTunnelExts
+
+ self.customCertRE = re.compile("^cert=(?P<nickname>[0-9a-zA-Z_ ]+)")
+ self.clientAuthRE = re.compile("^clientauth=(?P<clientauth>[a-z]+)")
+ self.redirRE = re.compile("^redir=(?P<redirhost>[0-9a-zA-Z_ .]+)")
+
+ def writeLocation(self, config, loc):
+ for option in loc.options:
+ match = self.customCertRE.match(option)
+ if match:
+ customcert = match.group("nickname")
+ config.write("listen:%s:%s:%s:%s\n" %
+ (loc.host, loc.port, self.sslPort, customcert))
+
+ match = self.clientAuthRE.match(option)
+ if match:
+ clientauth = match.group("clientauth")
+ config.write("clientauth:%s:%s:%s:%s\n" %
+ (loc.host, loc.port, self.sslPort, clientauth))
+
+ match = self.redirRE.match(option)
+ if match:
+ redirhost = match.group("redirhost")
+ config.write("redirhost:%s:%s:%s:%s\n" %
+ (loc.host, loc.port, self.sslPort, redirhost))
+
+ if self.useSSLTunnelExts and option in (
+ 'tls1',
+ 'ssl3',
+ 'rc4',
+ 'failHandshake'):
+ config.write(
+ "%s:%s:%s:%s\n" %
+ (option, loc.host, loc.port, self.sslPort))
+
+ def buildConfig(self, locations):
+ """Create the ssltunnel configuration file"""
+ configFd, self.configFile = tempfile.mkstemp(
+ prefix="ssltunnel", suffix=".cfg")
+ with os.fdopen(configFd, "w") as config:
+ config.write("httpproxy:1\n")
+ config.write("certdbdir:%s\n" % self.certPath)
+ config.write("forward:127.0.0.1:%s\n" % self.httpPort)
+ config.write(
+ "websocketserver:%s:%s\n" %
+ (self.webServer, self.webSocketPort))
+ config.write("listen:*:%s:pgo server certificate\n" % self.sslPort)
+
+ for loc in locations:
+ if loc.scheme == "https" and "nocert" not in loc.options:
+ self.writeLocation(config, loc)
+
+ def start(self):
+ """ Starts the SSL Tunnel """
+
+ # start ssltunnel to provide https:// URLs capability
+ bin_suffix = mozinfo.info.get('bin_suffix', '')
+ ssltunnel = os.path.join(self.utilityPath, "ssltunnel" + bin_suffix)
+ if not os.path.exists(ssltunnel):
+ self.log.error(
+ "INFO | runtests.py | expected to find ssltunnel at %s" %
+ ssltunnel)
+ exit(1)
+
+ env = test_environment(xrePath=self.xrePath, log=self.log)
+ env["LD_LIBRARY_PATH"] = self.xrePath
+ self.process = mozprocess.ProcessHandler([ssltunnel, self.configFile],
+ env=env)
+ self.process.run()
+ self.log.info("runtests.py | SSL tunnel pid: %d" % self.process.pid)
+
+ def stop(self):
+ """ Stops the SSL Tunnel and cleans up """
+ if self.process is not None:
+ self.process.kill()
+ if os.path.exists(self.configFile):
+ os.remove(self.configFile)
+
+
+def checkAndConfigureV4l2loopback(device):
+ '''
+ Determine if a given device path is a v4l2loopback device, and if so
+ toggle a few settings on it via fcntl. Very linux-specific.
+
+ Returns (status, device name) where status is a boolean.
+ '''
+ if not mozinfo.isLinux:
+ return False, ''
+
+ libc = ctypes.cdll.LoadLibrary('libc.so.6')
+ O_RDWR = 2
+ # These are from linux/videodev2.h
+
+ class v4l2_capability(ctypes.Structure):
+ _fields_ = [
+ ('driver', ctypes.c_char * 16),
+ ('card', ctypes.c_char * 32),
+ ('bus_info', ctypes.c_char * 32),
+ ('version', ctypes.c_uint32),
+ ('capabilities', ctypes.c_uint32),
+ ('device_caps', ctypes.c_uint32),
+ ('reserved', ctypes.c_uint32 * 3)
+ ]
+ VIDIOC_QUERYCAP = 0x80685600
+
+ fd = libc.open(device, O_RDWR)
+ if fd < 0:
+ return False, ''
+
+ vcap = v4l2_capability()
+ if libc.ioctl(fd, VIDIOC_QUERYCAP, ctypes.byref(vcap)) != 0:
+ return False, ''
+
+ if vcap.driver != 'v4l2 loopback':
+ return False, ''
+
+ class v4l2_control(ctypes.Structure):
+ _fields_ = [
+ ('id', ctypes.c_uint32),
+ ('value', ctypes.c_int32)
+ ]
+
+ # These are private v4l2 control IDs, see:
+ # https://github.com/umlaeute/v4l2loopback/blob/fd822cf0faaccdf5f548cddd9a5a3dcebb6d584d/v4l2loopback.c#L131
+ KEEP_FORMAT = 0x8000000
+ SUSTAIN_FRAMERATE = 0x8000001
+ VIDIOC_S_CTRL = 0xc008561c
+
+ control = v4l2_control()
+ control.id = KEEP_FORMAT
+ control.value = 1
+ libc.ioctl(fd, VIDIOC_S_CTRL, ctypes.byref(control))
+
+ control.id = SUSTAIN_FRAMERATE
+ control.value = 1
+ libc.ioctl(fd, VIDIOC_S_CTRL, ctypes.byref(control))
+ libc.close(fd)
+
+ return True, vcap.card
+
+
+def findTestMediaDevices(log):
+ '''
+ Find the test media devices configured on this system, and return a dict
+ containing information about them. The dict will have keys for 'audio'
+ and 'video', each containing the name of the media device to use.
+
+ If audio and video devices could not be found, return None.
+
+ This method is only currently implemented for Linux.
+ '''
+ if not mozinfo.isLinux:
+ return None
+
+ info = {}
+ # Look for a v4l2loopback device.
+ name = None
+ device = None
+ for dev in sorted(glob.glob('/dev/video*')):
+ result, name_ = checkAndConfigureV4l2loopback(dev)
+ if result:
+ name = name_
+ device = dev
+ break
+
+ if not (name and device):
+ log.error('Couldn\'t find a v4l2loopback video device')
+ return None
+
+ # Feed it a frame of output so it has something to display
+ subprocess.check_call(['/usr/bin/gst-launch-0.10', 'videotestsrc',
+ 'pattern=green', 'num-buffers=1', '!',
+ 'v4l2sink', 'device=%s' % device])
+ info['video'] = name
+
+ # Use pactl to see if the PulseAudio module-sine-source module is loaded.
+ def sine_source_loaded():
+ o = subprocess.check_output(
+ ['/usr/bin/pactl', 'list', 'short', 'modules'])
+ return filter(lambda x: 'module-sine-source' in x, o.splitlines())
+
+ if not sine_source_loaded():
+ # Load module-sine-source
+ subprocess.check_call(['/usr/bin/pactl', 'load-module',
+ 'module-sine-source'])
+ if not sine_source_loaded():
+ log.error('Couldn\'t load module-sine-source')
+ return None
+
+ # Hardcode the name since it's always the same.
+ info['audio'] = 'Sine source at 440 Hz'
+ return info
+
+
+class KeyValueParseError(Exception):
+
+ """error when parsing strings of serialized key-values"""
+
+ def __init__(self, msg, errors=()):
+ self.errors = errors
+ Exception.__init__(self, msg)
+
+
+def parseKeyValue(strings, separator='=', context='key, value: '):
"""
- Base mochitest class for desktop.
+ parse string-serialized key-value pairs in the form of
+ `key = value`. Returns a list of 2-tuples.
+ Note that whitespace is not stripped.
"""
+ # syntax check
+ missing = [string for string in strings if separator not in string]
+ if missing:
+ raise KeyValueParseError(
+ "Error: syntax error in %s" %
+ (context, ','.join(missing)), errors=missing)
+ return [string.split(separator, 1) for string in strings]
+
+
+class MochitestDesktop(object):
+ """
+ Mochitest class for desktop firefox.
+ """
oldcwd = os.getcwd()
mochijar = os.path.join(SCRIPT_DIR, 'mochijar')
# Path to the test script on the server
TEST_PATH = "tests"
NESTED_OOP_TEST_PATH = "nested_oop"
CHROME_PATH = "redirect.html"
log = None
+ certdbNew = False
+ sslTunnel = None
+ DEFAULT_TIMEOUT = 60.0
+ mediaDevices = None
+
+ # XXX use automation.py for test name to avoid breaking legacy
+ # TODO: replace this with 'runtests.py' or 'mochitest' or the like
+ test_name = 'automation.py'
+
def __init__(self, logger_options):
self.update_mozinfo()
self.server = None
self.wsserver = None
self.websocketProcessBridge = None
self.sslTunnel = None
self._active_tests = None
self._locations = None
@@ -550,19 +789,37 @@ class MochitestBase(object):
commandline.log_formatters["tbpl"] = (
MochitestFormatter,
"Mochitest specific tbpl formatter")
self.log = commandline.setup_logging("mochitest",
logger_options,
{
"tbpl": sys.stdout
})
- MochitestBase.log = self.log
+ MochitestDesktop.log = self.log
self.message_logger = MessageLogger(logger=self.log)
+ # Max time in seconds to wait for server startup before tests will fail -- if
+ # this seems big, it's mostly for debug machines where cold startup
+ # (particularly after a build) takes forever.
+ self.SERVER_STARTUP_TIMEOUT = 180 if mozinfo.info.get('debug') else 90
+
+ # metro browser sub process id
+ self.browserProcessId = None
+
+ self.haveDumpedScreen = False
+ # Create variables to count the number of passes, fails, todos.
+ self.countpass = 0
+ self.countfail = 0
+ self.counttodo = 0
+
+ self.expectedError = {}
+ self.result = {}
+
+ self.start_script = os.path.join(here, 'start_desktop.js')
def update_mozinfo(self):
"""walk up directories to find mozinfo.json update the info"""
# TODO: This should go in a more generic place, e.g. mozinfo
path = SCRIPT_DIR
dirs = set()
while path != os.path.expanduser('~'):
@@ -572,16 +829,25 @@ class MochitestBase(object):
path = os.path.split(path)[0]
mozinfo.find_and_update_from_json(*dirs)
def environment(self, **kwargs):
kwargs['log'] = self.log
return test_environment(**kwargs)
+ def extraPrefs(self, extraPrefs):
+ """interpolate extra preferences from option strings"""
+
+ try:
+ return dict(parseKeyValue(extraPrefs, context='--setpref='))
+ except KeyValueParseError as e:
+ print str(e)
+ sys.exit(1)
+
def getFullPath(self, path):
" Get an absolute path relative to self.oldcwd."
return os.path.normpath(
os.path.join(
self.oldcwd,
os.path.expanduser(path)))
def getLogFilePath(self, logFile):
@@ -1280,292 +1546,16 @@ toolbar#nav-bar {
with open(self.start_script, 'r') as fh:
script = fh.read()
else:
script = self.start_script
with self.marionette.using_context('chrome'):
return self.marionette.execute_script(script, script_args=self.start_script_args)
-
-class SSLTunnel:
-
- def __init__(self, options, logger, ignoreSSLTunnelExts=False):
- self.log = logger
- self.process = None
- self.utilityPath = options.utilityPath
- self.xrePath = options.xrePath
- self.certPath = options.certPath
- self.sslPort = options.sslPort
- self.httpPort = options.httpPort
- self.webServer = options.webServer
- self.webSocketPort = options.webSocketPort
- self.useSSLTunnelExts = not ignoreSSLTunnelExts
-
- self.customCertRE = re.compile("^cert=(?P<nickname>[0-9a-zA-Z_ ]+)")
- self.clientAuthRE = re.compile("^clientauth=(?P<clientauth>[a-z]+)")
- self.redirRE = re.compile("^redir=(?P<redirhost>[0-9a-zA-Z_ .]+)")
-
- def writeLocation(self, config, loc):
- for option in loc.options:
- match = self.customCertRE.match(option)
- if match:
- customcert = match.group("nickname")
- config.write("listen:%s:%s:%s:%s\n" %
- (loc.host, loc.port, self.sslPort, customcert))
-
- match = self.clientAuthRE.match(option)
- if match:
- clientauth = match.group("clientauth")
- config.write("clientauth:%s:%s:%s:%s\n" %
- (loc.host, loc.port, self.sslPort, clientauth))
-
- match = self.redirRE.match(option)
- if match:
- redirhost = match.group("redirhost")
- config.write("redirhost:%s:%s:%s:%s\n" %
- (loc.host, loc.port, self.sslPort, redirhost))
-
- if self.useSSLTunnelExts and option in (
- 'tls1',
- 'ssl3',
- 'rc4',
- 'failHandshake'):
- config.write(
- "%s:%s:%s:%s\n" %
- (option, loc.host, loc.port, self.sslPort))
-
- def buildConfig(self, locations):
- """Create the ssltunnel configuration file"""
- configFd, self.configFile = tempfile.mkstemp(
- prefix="ssltunnel", suffix=".cfg")
- with os.fdopen(configFd, "w") as config:
- config.write("httpproxy:1\n")
- config.write("certdbdir:%s\n" % self.certPath)
- config.write("forward:127.0.0.1:%s\n" % self.httpPort)
- config.write(
- "websocketserver:%s:%s\n" %
- (self.webServer, self.webSocketPort))
- config.write("listen:*:%s:pgo server certificate\n" % self.sslPort)
-
- for loc in locations:
- if loc.scheme == "https" and "nocert" not in loc.options:
- self.writeLocation(config, loc)
-
- def start(self):
- """ Starts the SSL Tunnel """
-
- # start ssltunnel to provide https:// URLs capability
- bin_suffix = mozinfo.info.get('bin_suffix', '')
- ssltunnel = os.path.join(self.utilityPath, "ssltunnel" + bin_suffix)
- if not os.path.exists(ssltunnel):
- self.log.error(
- "INFO | runtests.py | expected to find ssltunnel at %s" %
- ssltunnel)
- exit(1)
-
- env = test_environment(xrePath=self.xrePath, log=self.log)
- env["LD_LIBRARY_PATH"] = self.xrePath
- self.process = mozprocess.ProcessHandler([ssltunnel, self.configFile],
- env=env)
- self.process.run()
- self.log.info("runtests.py | SSL tunnel pid: %d" % self.process.pid)
-
- def stop(self):
- """ Stops the SSL Tunnel and cleans up """
- if self.process is not None:
- self.process.kill()
- if os.path.exists(self.configFile):
- os.remove(self.configFile)
-
-
-def checkAndConfigureV4l2loopback(device):
- '''
- Determine if a given device path is a v4l2loopback device, and if so
- toggle a few settings on it via fcntl. Very linux-specific.
-
- Returns (status, device name) where status is a boolean.
- '''
- if not mozinfo.isLinux:
- return False, ''
-
- libc = ctypes.cdll.LoadLibrary('libc.so.6')
- O_RDWR = 2
- # These are from linux/videodev2.h
-
- class v4l2_capability(ctypes.Structure):
- _fields_ = [
- ('driver', ctypes.c_char * 16),
- ('card', ctypes.c_char * 32),
- ('bus_info', ctypes.c_char * 32),
- ('version', ctypes.c_uint32),
- ('capabilities', ctypes.c_uint32),
- ('device_caps', ctypes.c_uint32),
- ('reserved', ctypes.c_uint32 * 3)
- ]
- VIDIOC_QUERYCAP = 0x80685600
-
- fd = libc.open(device, O_RDWR)
- if fd < 0:
- return False, ''
-
- vcap = v4l2_capability()
- if libc.ioctl(fd, VIDIOC_QUERYCAP, ctypes.byref(vcap)) != 0:
- return False, ''
-
- if vcap.driver != 'v4l2 loopback':
- return False, ''
-
- class v4l2_control(ctypes.Structure):
- _fields_ = [
- ('id', ctypes.c_uint32),
- ('value', ctypes.c_int32)
- ]
-
- # These are private v4l2 control IDs, see:
- # https://github.com/umlaeute/v4l2loopback/blob/fd822cf0faaccdf5f548cddd9a5a3dcebb6d584d/v4l2loopback.c#L131
- KEEP_FORMAT = 0x8000000
- SUSTAIN_FRAMERATE = 0x8000001
- VIDIOC_S_CTRL = 0xc008561c
-
- control = v4l2_control()
- control.id = KEEP_FORMAT
- control.value = 1
- libc.ioctl(fd, VIDIOC_S_CTRL, ctypes.byref(control))
-
- control.id = SUSTAIN_FRAMERATE
- control.value = 1
- libc.ioctl(fd, VIDIOC_S_CTRL, ctypes.byref(control))
- libc.close(fd)
-
- return True, vcap.card
-
-
-def findTestMediaDevices(log):
- '''
- Find the test media devices configured on this system, and return a dict
- containing information about them. The dict will have keys for 'audio'
- and 'video', each containing the name of the media device to use.
-
- If audio and video devices could not be found, return None.
-
- This method is only currently implemented for Linux.
- '''
- if not mozinfo.isLinux:
- return None
-
- info = {}
- # Look for a v4l2loopback device.
- name = None
- device = None
- for dev in sorted(glob.glob('/dev/video*')):
- result, name_ = checkAndConfigureV4l2loopback(dev)
- if result:
- name = name_
- device = dev
- break
-
- if not (name and device):
- log.error('Couldn\'t find a v4l2loopback video device')
- return None
-
- # Feed it a frame of output so it has something to display
- subprocess.check_call(['/usr/bin/gst-launch-0.10', 'videotestsrc',
- 'pattern=green', 'num-buffers=1', '!',
- 'v4l2sink', 'device=%s' % device])
- info['video'] = name
-
- # Use pactl to see if the PulseAudio module-sine-source module is loaded.
- def sine_source_loaded():
- o = subprocess.check_output(
- ['/usr/bin/pactl', 'list', 'short', 'modules'])
- return filter(lambda x: 'module-sine-source' in x, o.splitlines())
-
- if not sine_source_loaded():
- # Load module-sine-source
- subprocess.check_call(['/usr/bin/pactl', 'load-module',
- 'module-sine-source'])
- if not sine_source_loaded():
- log.error('Couldn\'t load module-sine-source')
- return None
-
- # Hardcode the name since it's always the same.
- info['audio'] = 'Sine source at 440 Hz'
- return info
-
-
-class KeyValueParseError(Exception):
-
- """error when parsing strings of serialized key-values"""
-
- def __init__(self, msg, errors=()):
- self.errors = errors
- Exception.__init__(self, msg)
-
-
-def parseKeyValue(strings, separator='=', context='key, value: '):
- """
- parse string-serialized key-value pairs in the form of
- `key = value`. Returns a list of 2-tuples.
- Note that whitespace is not stripped.
- """
-
- # syntax check
- missing = [string for string in strings if separator not in string]
- if missing:
- raise KeyValueParseError(
- "Error: syntax error in %s" %
- (context, ','.join(missing)), errors=missing)
- return [string.split(separator, 1) for string in strings]
-
-
-class MochitestDesktop(MochitestBase):
- """
- Mochitest class for desktop firefox.
- """
- certdbNew = False
- sslTunnel = None
- DEFAULT_TIMEOUT = 60.0
- mediaDevices = None
-
- # XXX use automation.py for test name to avoid breaking legacy
- # TODO: replace this with 'runtests.py' or 'mochitest' or the like
- test_name = 'automation.py'
-
- def __init__(self, *args, **kwargs):
- MochitestBase.__init__(self, *args, **kwargs)
-
- # Max time in seconds to wait for server startup before tests will fail -- if
- # this seems big, it's mostly for debug machines where cold startup
- # (particularly after a build) takes forever.
- self.SERVER_STARTUP_TIMEOUT = 180 if mozinfo.info.get('debug') else 90
-
- # metro browser sub process id
- self.browserProcessId = None
-
- self.haveDumpedScreen = False
- # Create variables to count the number of passes, fails, todos.
- self.countpass = 0
- self.countfail = 0
- self.counttodo = 0
-
- self.expectedError = {}
- self.result = {}
-
- self.start_script = os.path.join(here, 'start_desktop.js')
-
- def extraPrefs(self, extraPrefs):
- """interpolate extra preferences from option strings"""
-
- try:
- return dict(parseKeyValue(extraPrefs, context='--setpref='))
- except KeyValueParseError as e:
- print str(e)
- sys.exit(1)
-
def fillCertificateDB(self, options):
# TODO: move -> mozprofile:
# https://bugzilla.mozilla.org/show_bug.cgi?id=746243#c35
pwfilePath = os.path.join(options.profilePath, ".crtdbpw")
with open(pwfilePath, "w") as pwfile:
pwfile.write("\n")