--- a/toolkit/modules/subprocess/subprocess_common.jsm
+++ b/toolkit/modules/subprocess/subprocess_common.jsm
@@ -10,16 +10,18 @@
/* exported BaseProcess, PromiseWorker */
var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
Cu.importGlobalProperties(["TextDecoder"]);
+XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
+ "resource://gre/modules/AsyncShutdown.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "setTimeout",
"resource://gre/modules/Timer.jsm");
Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker", "SubprocessConstants"];
const BUFFER_SIZE = 4096;
@@ -33,21 +35,35 @@ let nextResponseId = 0;
*/
class PromiseWorker extends ChromeWorker {
constructor(url) {
super(url);
this.listeners = new Map();
this.pendingResponses = new Map();
+ this.addListener("close", this.onClose.bind(this));
this.addListener("failure", this.onFailure.bind(this));
this.addListener("success", this.onSuccess.bind(this));
this.addListener("debug", this.onDebug.bind(this));
this.addEventListener("message", this.onmessage);
+
+ this.shutdown = this.shutdown.bind(this);
+ AsyncShutdown.webWorkersShutdown.addBlocker(
+ "Subprocess.jsm: Shut down IO worker",
+ this.shutdown);
+ }
+
+ onClose() {
+ AsyncShutdown.webWorkersShutdown.removeBlocker(this.shutdown);
+ }
+
+ shutdown() {
+ return this.call("shutdown", []);
}
/**
* Adds a listener for the given message from the worker. Any message received
* from the worker with a `data.msg` property matching the given `msg`
* parameter are passed to the given listener.
*
* @param {string} msg
@@ -610,25 +626,29 @@ class BaseProcess {
return new this(worker, processId, fds, pid);
});
}
static get WORKER_URL() {
throw new Error("Not implemented");
}
+ static get WorkerClass() {
+ return PromiseWorker;
+ }
+
/**
* Gets the current subprocess worker, or spawns a new one if it does not
* currently exist.
*
* @returns {PromiseWorker}
*/
static getWorker() {
if (!this._worker) {
- this._worker = new PromiseWorker(this.WORKER_URL);
+ this._worker = new this.WorkerClass(this.WORKER_URL);
}
return this._worker;
}
/**
* Kills the process.
*
* @param {integer} [timeout=300]
--- a/toolkit/modules/subprocess/subprocess_shared_win.js
+++ b/toolkit/modules/subprocess/subprocess_shared_win.js
@@ -17,16 +17,17 @@ var win32 = {
// On Windows 64, winapi_abi is an alias for default_abi.
WINAPI: ctypes.winapi_abi,
VOID: ctypes.void_t,
BYTE: ctypes.uint8_t,
WORD: ctypes.uint16_t,
DWORD: ctypes.uint32_t,
+ LONG: ctypes.long,
UINT: ctypes.unsigned_int,
UCHAR: ctypes.unsigned_char,
BOOL: ctypes.bool,
HANDLE: ctypes.voidptr_t,
PVOID: ctypes.voidptr_t,
@@ -214,16 +215,25 @@ var libc = new Library("libc", LIBC_CHOI
win32.BOOL, /* bInheritHandle */
win32.DWORD, /* dwCreationFlags */
win32.LPVOID, /* opt lpEnvironment */
win32.LPCWSTR, /* opt lpCurrentDirectory */
win32.STARTUPINFOW.ptr, /* lpStartupInfo */
win32.PROCESS_INFORMATION.ptr, /* out lpProcessInformation */
],
+ CreateSemaphoreW: [
+ win32.WINAPI,
+ win32.HANDLE,
+ win32.SECURITY_ATTRIBUTES.ptr, /* opt lpSemaphoreAttributes */
+ win32.LONG, /* lInitialCount */
+ win32.LONG, /* lMaximumCount */
+ win32.LPCWSTR, /* opt lpName */
+ ],
+
DeleteProcThreadAttributeList: [
win32.WINAPI,
win32.VOID,
win32.LPPROC_THREAD_ATTRIBUTE_LIST, /* in/out lpAttributeList */
],
DuplicateHandle: [
win32.WINAPI,
@@ -294,16 +304,24 @@ var libc = new Library("libc", LIBC_CHOI
win32.BOOL,
win32.HANDLE, /* hFile */
win32.LPVOID, /* out lpBuffer */
win32.DWORD, /* nNumberOfBytesToRead */
win32.LPDWORD, /* opt out lpNumberOfBytesRead */
win32.OVERLAPPED.ptr, /* opt in/out lpOverlapped */
],
+ ReleaseSemaphore: [
+ win32.WINAPI,
+ win32.BOOL,
+ win32.HANDLE, /* hSemaphore */
+ win32.LONG, /* lReleaseCount */
+ win32.LONG.ptr, /* opt out lpPreviousCount */
+ ],
+
TerminateProcess: [
win32.WINAPI,
win32.BOOL,
win32.HANDLE, /* hProcess */
win32.UINT, /* uExitCode */
],
UpdateProcThreadAttribute: [
--- a/toolkit/modules/subprocess/subprocess_unix.jsm
+++ b/toolkit/modules/subprocess/subprocess_unix.jsm
@@ -4,35 +4,82 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
/* eslint-disable mozilla/balanced-listeners */
/* exported SubprocessImpl */
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
var EXPORTED_SYMBOLS = ["SubprocessImpl"];
Cu.import("resource://gre/modules/ctypes.jsm");
Cu.import("resource://gre/modules/osfile.jsm");
Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://gre/modules/Task.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_unix.js", this);
+class UnixPromiseWorker extends PromiseWorker {
+ constructor(...args) {
+ super(...args);
+
+ let fds = ctypes.int.array(2)();
+ let res = libc.pipe(fds);
+ if (res == -1) {
+ throw new Error("Unable to create pipe");
+ }
+
+ this.signalFd = fds[1];
+
+ libc.fcntl(fds[0], LIBC.F_SETFL, LIBC.O_NONBLOCK);
+ libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+ libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+
+ this.call("init", [{signalFd: fds[0]}]);
+ }
+
+ closePipe() {
+ if (this.signalFd) {
+ libc.close(this.signalFd);
+ this.signalFd = null;
+ }
+ }
+
+ onClose() {
+ this.closePipe();
+ super.onClose();
+ }
+
+ signalWorker() {
+ libc.write(this.signalFd, new ArrayBuffer(1), 1);
+ }
+
+ postMessage(...args) {
+ this.signalWorker();
+ return super.postMessage(...args);
+ }
+}
+
+
class Process extends BaseProcess {
static get WORKER_URL() {
return "resource://gre/modules/subprocess/subprocess_worker_unix.js";
}
+
+ static get WorkerClass() {
+ return UnixPromiseWorker;
+ }
}
var SubprocessUnix = {
Process,
call(options) {
return Process.create(options);
},
--- a/toolkit/modules/subprocess/subprocess_win.jsm
+++ b/toolkit/modules/subprocess/subprocess_win.jsm
@@ -4,45 +4,69 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
/* eslint-disable mozilla/balanced-listeners */
/* exported SubprocessImpl */
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
var EXPORTED_SYMBOLS = ["SubprocessImpl"];
Cu.import("resource://gre/modules/ctypes.jsm");
Cu.import("resource://gre/modules/osfile.jsm");
Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://gre/modules/Task.jsm");
Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_win.js", this);
+class WinPromiseWorker extends PromiseWorker {
+ constructor(...args) {
+ super(...args);
+
+ this.signalEvent = libc.CreateSemaphoreW(null, 0, 32, null);
+
+ this.call("init", [{
+ signalEvent: String(ctypes.cast(this.signalEvent, ctypes.uintptr_t).value),
+ }]);
+ }
+
+ signalWorker() {
+ libc.ReleaseSemaphore(this.signalEvent, 1, null);
+ }
+
+ postMessage(...args) {
+ this.signalWorker();
+ return super.postMessage(...args);
+ }
+}
+
class Process extends BaseProcess {
static get WORKER_URL() {
return "resource://gre/modules/subprocess/subprocess_worker_win.js";
}
+
+ static get WorkerClass() {
+ return WinPromiseWorker;
+ }
}
var SubprocessWin = {
Process,
call(options) {
return Process.create(options);
},
-
* getEnvironment() {
let env = libc.GetEnvironmentStringsW();
try {
for (let p = env, q = env; ; p = p.increment()) {
if (p.contents == "\0") {
if (String(p) == String(q)) {
break;
}
--- a/toolkit/modules/subprocess/subprocess_worker_common.js
+++ b/toolkit/modules/subprocess/subprocess_worker_common.js
@@ -83,16 +83,28 @@ class BaseProcess {
// by the GC if it runs before our process is started.
this.stringArrays.push(cstrings);
return result;
}
}
let requests = {
+ init(details) {
+ io.init(details);
+
+ return {data: {}};
+ },
+
+ shutdown() {
+ io.shutdown();
+
+ return {data: {}};
+ },
+
close(pipeId, force = false) {
let pipe = io.getPipe(pipeId);
return pipe.close(force).then(() => ({data: {}}));
},
spawn(options) {
let process = new Process(options);
@@ -151,16 +163,18 @@ let requests = {
},
waitForNoProcesses() {
return Promise.all(Array.from(io.processes.values(), proc => proc.exitPromise));
},
};
onmessage = event => {
+ io.messageCount--;
+
let {msg, msgId, args} = event.data;
new Promise(resolve => {
resolve(requests[msg](...args));
}).then(result => {
let response = {
msg: "success",
msgId,
@@ -190,8 +204,14 @@ onmessage = event => {
self.postMessage({
msg: "failure",
msgId,
error: {},
});
});
};
+
+onclose = event => {
+ io.shutdown();
+
+ self.postMessage({msg: "close"});
+};
--- a/toolkit/modules/subprocess/subprocess_worker_unix.js
+++ b/toolkit/modules/subprocess/subprocess_worker_unix.js
@@ -7,18 +7,17 @@
/* exported Process */
/* globals BaseProcess, BasePipe */
importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
"resource://gre/modules/subprocess/subprocess_shared_unix.js",
"resource://gre/modules/subprocess/subprocess_worker_common.js");
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
let io;
let nextPipeId = 0;
class Pipe extends BasePipe {
constructor(process, fd) {
super();
@@ -243,16 +242,50 @@ class OutputPipe extends Pipe {
}
if (writes.length == 0) {
io.updatePollFds();
}
}
}
+class Signal {
+ constructor(fd) {
+ this.fd = fd;
+ }
+
+ cleanup() {
+ libc.close(this.fd);
+ this.fd = null;
+ }
+
+ get pollEvents() {
+ return LIBC.POLLIN;
+ }
+
+ /**
+ * Called when an error occurred while polling our file descriptor.
+ */
+ onError() {
+ io.shutdown();
+ }
+
+ /**
+ * Called when one of the IO operations matching the `pollEvents` mask may be
+ * performed without blocking.
+ */
+ onReady() {
+ let buffer = new ArrayBuffer(16);
+ let count = +libc.read(this.fd, buffer, buffer.byteLength);
+ if (count > 0) {
+ io.messageCount += count;
+ }
+ }
+}
+
class Process extends BaseProcess {
/**
* Each Process object opens an additional pipe from the target object, which
* will be automatically closed when the process exits, but otherwise
* carries no data.
*
* This property contains a bit mask of poll() events which we wish to be
* notified of on this descriptor. We're not expecting any input from this
@@ -444,17 +477,37 @@ class Process extends BaseProcess {
io = {
pollFds: null,
pollHandlers: null,
pipes: new Map(),
processes: new Map(),
- interval: null,
+ messageCount: 0,
+
+ running: true,
+
+ init(details) {
+ this.signal = new Signal(details.signalFd);
+ this.updatePollFds();
+
+ setTimeout(this.loop.bind(this), 0);
+ },
+
+ shutdown() {
+ if (this.running) {
+ this.running = false;
+
+ this.signal.cleanup();
+ this.signal = null;
+
+ self.close();
+ }
+ },
getPipe(pipeId) {
let pipe = this.pipes.get(pipeId);
if (!pipe) {
let error = new Error("File closed");
error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
throw error;
@@ -467,47 +520,49 @@ io = {
if (!process) {
throw new Error(`Invalid process ID: ${processId}`);
}
return process;
},
updatePollFds() {
- let handlers = [...this.pipes.values(),
+ let handlers = [this.signal,
+ ...this.pipes.values(),
...this.processes.values()];
handlers = handlers.filter(handler => handler.pollEvents);
let pollfds = unix.pollfd.array(handlers.length)();
for (let [i, handler] of handlers.entries()) {
let pollfd = pollfds[i];
pollfd.fd = handler.fd;
pollfd.events = handler.pollEvents;
pollfd.revents = 0;
}
this.pollFds = pollfds;
this.pollHandlers = handlers;
+ },
- if (pollfds.length && !this.interval) {
- this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
- } else if (!pollfds.length && this.interval) {
- clearInterval(this.interval);
- this.interval = null;
+ loop() {
+ this.poll();
+ if (this.running) {
+ setTimeout(this.loop.bind(this), 0);
}
},
poll() {
let handlers = this.pollHandlers;
let pollfds = this.pollFds;
- let count = libc.poll(pollfds, pollfds.length, POLL_TIMEOUT);
+ let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+ let count = libc.poll(pollfds, pollfds.length, timeout);
for (let i = 0; count && i < pollfds.length; i++) {
let pollfd = pollfds[i];
if (pollfd.revents) {
count--;
let handler = handlers[i];
try {
--- a/toolkit/modules/subprocess/subprocess_worker_win.js
+++ b/toolkit/modules/subprocess/subprocess_worker_win.js
@@ -7,18 +7,17 @@
/* exported Process */
/* globals BaseProcess, BasePipe, win32 */
importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
"resource://gre/modules/subprocess/subprocess_shared_win.js",
"resource://gre/modules/subprocess/subprocess_worker_common.js");
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
// The exit code that we send when we forcibly terminate a process.
const TERMINATE_EXIT_CODE = 0x7f;
let io;
let nextPipeId = 0;
@@ -293,16 +292,35 @@ class OutputPipe extends Pipe {
this.writeNext();
} else {
io.updatePollEvents();
}
}
}
}
+class Signal {
+ constructor(event) {
+ this.event = event;
+ }
+
+ cleanup() {
+ libc.CloseHandle(this.event);
+ this.event = null;
+ }
+
+ onError() {
+ io.shutdown();
+ }
+
+ onReady() {
+ io.messageCount += 1;
+ }
+}
+
class Process extends BaseProcess {
constructor(...args) {
super(...args);
this.killed = false;
}
/**
@@ -538,17 +556,39 @@ class Process extends BaseProcess {
io = {
events: null,
eventHandlers: null,
pipes: new Map(),
processes: new Map(),
- interval: null,
+ messageCount: 0,
+
+ running: true,
+
+ init(details) {
+ let signalEvent = ctypes.cast(ctypes.uintptr_t(details.signalEvent),
+ win32.HANDLE);
+ this.signal = new Signal(signalEvent);
+ this.updatePollEvents();
+
+ setTimeout(this.loop.bind(this), 0);
+ },
+
+ shutdown() {
+ if (this.running) {
+ this.running = false;
+
+ this.signal.cleanup();
+ this.signal = null;
+
+ self.close();
+ }
+ },
getPipe(pipeId) {
let pipe = this.pipes.get(pipeId);
if (!pipe) {
let error = new Error("File closed");
error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
throw error;
@@ -561,41 +601,44 @@ io = {
if (!process) {
throw new Error(`Invalid process ID: ${processId}`);
}
return process;
},
updatePollEvents() {
- let handlers = [...this.pipes.values(),
+ let handlers = [this.signal,
+ ...this.pipes.values(),
...this.processes.values()];
handlers = handlers.filter(handler => handler.event);
this.eventHandlers = handlers;
let handles = handlers.map(handler => handler.event);
this.events = win32.HANDLE.array()(handles);
+ },
- if (handles.length && !this.interval) {
- this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
- } else if (!handlers.length && this.interval) {
- clearInterval(this.interval);
- this.interval = null;
+ loop() {
+ this.poll();
+ if (this.running) {
+ setTimeout(this.loop.bind(this), 0);
}
},
+
poll() {
- for (;;) {
+ let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+ for (;; timeout = 0) {
let events = this.events;
let handlers = this.eventHandlers;
let result = libc.WaitForMultipleObjects(events.length, events,
- false, POLL_TIMEOUT);
+ false, timeout);
if (result < handlers.length) {
try {
handlers[result].onReady();
} catch (e) {
console.error(e);
debug(`Worker error: ${e} :: ${e.stack}`);
handlers[result].onError();
--- a/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
+++ b/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
@@ -1,16 +1,19 @@
"use strict";
+Cu.import("resource://gre/modules/AppConstants.jsm");
Cu.import("resource://gre/modules/Task.jsm");
Cu.import("resource://gre/modules/Timer.jsm");
const env = Cc["@mozilla.org/process/environment;1"].getService(Ci.nsIEnvironment);
+const MAX_ROUND_TRIP_TIME_MS = AppConstants.DEBUG ? 8 : 2;
+
let PYTHON;
let PYTHON_BIN;
let PYTHON_DIR;
const TEST_SCRIPT = do_get_file("data_test_script.py").path;
let read = pipe => {
return pipe.readUint32().then(count => {
@@ -173,16 +176,52 @@ add_task(function* test_subprocess_huge(
let {exitCode} = yield proc.wait();
equal(exitCode, 0, "Got expected exit code");
});
+add_task(function* test_subprocess_round_trip_perf() {
+ let proc = yield Subprocess.call({
+ command: PYTHON,
+ arguments: ["-u", TEST_SCRIPT, "echo"],
+ });
+
+
+ const LINE = "I'm a leaf on the wind.\n";
+
+ let now = Date.now();
+ const COUNT = 1000;
+ for (let i = 0; i < COUNT; i++) {
+ let [output] = yield Promise.all([
+ read(proc.stdout),
+ proc.stdin.write(LINE),
+ ]);
+
+ // We don't want to log this for every iteration, but we still need
+ // to fail if it goes wrong.
+ if (output !== LINE) {
+ equal(output, LINE, "Got expected output");
+ }
+ }
+
+ let roundTripTime = (Date.now() - now) / COUNT;
+ ok(roundTripTime <= MAX_ROUND_TRIP_TIME_MS,
+ `Expected round trip time (${roundTripTime}ms) to be less than ${MAX_ROUND_TRIP_TIME_MS}ms`);
+
+ yield proc.stdin.close();
+
+ let {exitCode} = yield proc.wait();
+
+ equal(exitCode, 0, "Got expected exit code");
+});
+
+
add_task(function* test_subprocess_stderr_default() {
const LINE1 = "I'm a leaf on the wind.\n";
const LINE2 = "Watch how I soar.\n";
let proc = yield Subprocess.call({
command: PYTHON,
arguments: ["-u", TEST_SCRIPT, "print", LINE1, LINE2],
});