Bug 1276390 - Use blocking polling in worker to handle subprocess IO. r?aswan draft
authorKris Maglione <maglione.k@gmail.com>
Wed, 27 Jul 2016 15:14:57 -0700
changeset 393506 ae701c467dcf00353ee8eb5ec32aab4f916ad9fd
parent 393505 43aa99ffc2d9891b31f232e2c2a9aa24e0f722dc
child 526589 84d08a03a8337c6b493e9e06a620636b9700d68c
push id24319
push usermaglione.k@gmail.com
push dateWed, 27 Jul 2016 22:52:29 +0000
reviewersaswan
bugs1276390
milestone50.0a1
Bug 1276390 - Use blocking polling in worker to handle subprocess IO. r?aswan MozReview-Commit-ID: KXqgCLnO7dR
toolkit/modules/subprocess/subprocess_common.jsm
toolkit/modules/subprocess/subprocess_shared_win.js
toolkit/modules/subprocess/subprocess_unix.jsm
toolkit/modules/subprocess/subprocess_win.jsm
toolkit/modules/subprocess/subprocess_worker_common.js
toolkit/modules/subprocess/subprocess_worker_unix.js
toolkit/modules/subprocess/subprocess_worker_win.js
toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
--- a/toolkit/modules/subprocess/subprocess_common.jsm
+++ b/toolkit/modules/subprocess/subprocess_common.jsm
@@ -10,16 +10,18 @@
 /* exported BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.importGlobalProperties(["TextDecoder"]);
 
+XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
+                                  "resource://gre/modules/AsyncShutdown.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "setTimeout",
                                   "resource://gre/modules/Timer.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 
 var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker", "SubprocessConstants"];
 
 const BUFFER_SIZE = 4096;
@@ -33,21 +35,35 @@ let nextResponseId = 0;
  */
 class PromiseWorker extends ChromeWorker {
   constructor(url) {
     super(url);
 
     this.listeners = new Map();
     this.pendingResponses = new Map();
 
+    this.addListener("close", this.onClose.bind(this));
     this.addListener("failure", this.onFailure.bind(this));
     this.addListener("success", this.onSuccess.bind(this));
     this.addListener("debug", this.onDebug.bind(this));
 
     this.addEventListener("message", this.onmessage);
+
+    this.shutdown = this.shutdown.bind(this);
+    AsyncShutdown.webWorkersShutdown.addBlocker(
+      "Subprocess.jsm: Shut down IO worker",
+      this.shutdown);
+  }
+
+  onClose() {
+    AsyncShutdown.webWorkersShutdown.removeBlocker(this.shutdown);
+  }
+
+  shutdown() {
+    return this.call("shutdown", []);
   }
 
   /**
    * Adds a listener for the given message from the worker. Any message received
    * from the worker with a `data.msg` property matching the given `msg`
    * parameter are passed to the given listener.
    *
    * @param {string} msg
@@ -610,25 +626,29 @@ class BaseProcess {
       return new this(worker, processId, fds, pid);
     });
   }
 
   static get WORKER_URL() {
     throw new Error("Not implemented");
   }
 
+  static get WorkerClass() {
+    return PromiseWorker;
+  }
+
   /**
    * Gets the current subprocess worker, or spawns a new one if it does not
    * currently exist.
    *
    * @returns {PromiseWorker}
    */
   static getWorker() {
     if (!this._worker) {
-      this._worker = new PromiseWorker(this.WORKER_URL);
+      this._worker = new this.WorkerClass(this.WORKER_URL);
     }
     return this._worker;
   }
 
   /**
    * Kills the process.
    *
    * @param {integer} [timeout=300]
--- a/toolkit/modules/subprocess/subprocess_shared_win.js
+++ b/toolkit/modules/subprocess/subprocess_shared_win.js
@@ -17,16 +17,17 @@ var win32 = {
   // On Windows 64, winapi_abi is an alias for default_abi.
   WINAPI: ctypes.winapi_abi,
 
   VOID: ctypes.void_t,
 
   BYTE: ctypes.uint8_t,
   WORD: ctypes.uint16_t,
   DWORD: ctypes.uint32_t,
+  LONG: ctypes.long,
 
   UINT: ctypes.unsigned_int,
   UCHAR: ctypes.unsigned_char,
 
   BOOL: ctypes.bool,
 
   HANDLE: ctypes.voidptr_t,
   PVOID: ctypes.voidptr_t,
@@ -214,16 +215,25 @@ var libc = new Library("libc", LIBC_CHOI
     win32.BOOL, /* bInheritHandle */
     win32.DWORD, /* dwCreationFlags */
     win32.LPVOID, /* opt lpEnvironment */
     win32.LPCWSTR, /* opt lpCurrentDirectory */
     win32.STARTUPINFOW.ptr, /* lpStartupInfo */
     win32.PROCESS_INFORMATION.ptr, /* out lpProcessInformation */
   ],
 
+  CreateSemaphoreW: [
+    win32.WINAPI,
+    win32.HANDLE,
+    win32.SECURITY_ATTRIBUTES.ptr, /* opt lpSemaphoreAttributes */
+    win32.LONG, /* lInitialCount */
+    win32.LONG, /* lMaximumCount */
+    win32.LPCWSTR, /* opt lpName */
+  ],
+
   DeleteProcThreadAttributeList: [
     win32.WINAPI,
     win32.VOID,
     win32.LPPROC_THREAD_ATTRIBUTE_LIST, /* in/out lpAttributeList */
   ],
 
   DuplicateHandle: [
     win32.WINAPI,
@@ -294,16 +304,24 @@ var libc = new Library("libc", LIBC_CHOI
     win32.BOOL,
     win32.HANDLE, /* hFile */
     win32.LPVOID, /* out lpBuffer */
     win32.DWORD, /* nNumberOfBytesToRead */
     win32.LPDWORD, /* opt out lpNumberOfBytesRead */
     win32.OVERLAPPED.ptr, /* opt in/out lpOverlapped */
   ],
 
+  ReleaseSemaphore: [
+    win32.WINAPI,
+    win32.BOOL,
+    win32.HANDLE, /* hSemaphore */
+    win32.LONG, /* lReleaseCount */
+    win32.LONG.ptr, /* opt out lpPreviousCount */
+  ],
+
   TerminateProcess: [
     win32.WINAPI,
     win32.BOOL,
     win32.HANDLE, /* hProcess */
     win32.UINT, /* uExitCode */
   ],
 
   UpdateProcThreadAttribute: [
--- a/toolkit/modules/subprocess/subprocess_unix.jsm
+++ b/toolkit/modules/subprocess/subprocess_unix.jsm
@@ -4,35 +4,82 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 "use strict";
 
 /* eslint-disable mozilla/balanced-listeners */
 
 /* exported SubprocessImpl */
 
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 var EXPORTED_SYMBOLS = ["SubprocessImpl"];
 
 Cu.import("resource://gre/modules/ctypes.jsm");
 Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_unix.js", this);
 
+class UnixPromiseWorker extends PromiseWorker {
+  constructor(...args) {
+    super(...args);
+
+    let fds = ctypes.int.array(2)();
+    let res = libc.pipe(fds);
+    if (res == -1) {
+      throw new Error("Unable to create pipe");
+    }
+
+    this.signalFd = fds[1];
+
+    libc.fcntl(fds[0], LIBC.F_SETFL, LIBC.O_NONBLOCK);
+    libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+    libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+
+    this.call("init", [{signalFd: fds[0]}]);
+  }
+
+  closePipe() {
+    if (this.signalFd) {
+      libc.close(this.signalFd);
+      this.signalFd = null;
+    }
+  }
+
+  onClose() {
+    this.closePipe();
+    super.onClose();
+  }
+
+  signalWorker() {
+    libc.write(this.signalFd, new ArrayBuffer(1), 1);
+  }
+
+  postMessage(...args) {
+    this.signalWorker();
+    return super.postMessage(...args);
+  }
+}
+
+
 class Process extends BaseProcess {
   static get WORKER_URL() {
     return "resource://gre/modules/subprocess/subprocess_worker_unix.js";
   }
+
+  static get WorkerClass() {
+    return UnixPromiseWorker;
+  }
 }
 
 var SubprocessUnix = {
   Process,
 
   call(options) {
     return Process.create(options);
   },
--- a/toolkit/modules/subprocess/subprocess_win.jsm
+++ b/toolkit/modules/subprocess/subprocess_win.jsm
@@ -4,45 +4,69 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 "use strict";
 
 /* eslint-disable mozilla/balanced-listeners */
 
 /* exported SubprocessImpl */
 
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 var EXPORTED_SYMBOLS = ["SubprocessImpl"];
 
 Cu.import("resource://gre/modules/ctypes.jsm");
 Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_win.js", this);
 
+class WinPromiseWorker extends PromiseWorker {
+  constructor(...args) {
+    super(...args);
+
+    this.signalEvent = libc.CreateSemaphoreW(null, 0, 32, null);
+
+    this.call("init", [{
+      signalEvent: String(ctypes.cast(this.signalEvent, ctypes.uintptr_t).value),
+    }]);
+  }
+
+  signalWorker() {
+    libc.ReleaseSemaphore(this.signalEvent, 1, null);
+  }
+
+  postMessage(...args) {
+    this.signalWorker();
+    return super.postMessage(...args);
+  }
+}
+
 class Process extends BaseProcess {
   static get WORKER_URL() {
     return "resource://gre/modules/subprocess/subprocess_worker_win.js";
   }
+
+  static get WorkerClass() {
+    return WinPromiseWorker;
+  }
 }
 
 var SubprocessWin = {
   Process,
 
   call(options) {
     return Process.create(options);
   },
 
-
   * getEnvironment() {
     let env = libc.GetEnvironmentStringsW();
     try {
       for (let p = env, q = env; ; p = p.increment()) {
         if (p.contents == "\0") {
           if (String(p) == String(q)) {
             break;
           }
--- a/toolkit/modules/subprocess/subprocess_worker_common.js
+++ b/toolkit/modules/subprocess/subprocess_worker_common.js
@@ -83,16 +83,28 @@ class BaseProcess {
     // by the GC if it runs before our process is started.
     this.stringArrays.push(cstrings);
 
     return result;
   }
 }
 
 let requests = {
+  init(details) {
+    io.init(details);
+
+    return {data: {}};
+  },
+
+  shutdown() {
+    io.shutdown();
+
+    return {data: {}};
+  },
+
   close(pipeId, force = false) {
     let pipe = io.getPipe(pipeId);
 
     return pipe.close(force).then(() => ({data: {}}));
   },
 
   spawn(options) {
     let process = new Process(options);
@@ -151,16 +163,18 @@ let requests = {
   },
 
   waitForNoProcesses() {
     return Promise.all(Array.from(io.processes.values(), proc => proc.exitPromise));
   },
 };
 
 onmessage = event => {
+  io.messageCount--;
+
   let {msg, msgId, args} = event.data;
 
   new Promise(resolve => {
     resolve(requests[msg](...args));
   }).then(result => {
     let response = {
       msg: "success",
       msgId,
@@ -190,8 +204,14 @@ onmessage = event => {
 
     self.postMessage({
       msg: "failure",
       msgId,
       error: {},
     });
   });
 };
+
+onclose = event => {
+  io.shutdown();
+
+  self.postMessage({msg: "close"});
+};
--- a/toolkit/modules/subprocess/subprocess_worker_unix.js
+++ b/toolkit/modules/subprocess/subprocess_worker_unix.js
@@ -7,18 +7,17 @@
 
 /* exported Process */
 /* globals BaseProcess, BasePipe */
 
 importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
               "resource://gre/modules/subprocess/subprocess_shared_unix.js",
               "resource://gre/modules/subprocess/subprocess_worker_common.js");
 
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
 
 let io;
 
 let nextPipeId = 0;
 
 class Pipe extends BasePipe {
   constructor(process, fd) {
     super();
@@ -243,16 +242,50 @@ class OutputPipe extends Pipe {
     }
 
     if (writes.length == 0) {
       io.updatePollFds();
     }
   }
 }
 
+class Signal {
+  constructor(fd) {
+    this.fd = fd;
+  }
+
+  cleanup() {
+    libc.close(this.fd);
+    this.fd = null;
+  }
+
+  get pollEvents() {
+    return LIBC.POLLIN;
+  }
+
+  /**
+   * Called when an error occurred while polling our file descriptor.
+   */
+  onError() {
+    io.shutdown();
+  }
+
+  /**
+   * Called when one of the IO operations matching the `pollEvents` mask may be
+   * performed without blocking.
+   */
+  onReady() {
+    let buffer = new ArrayBuffer(16);
+    let count = +libc.read(this.fd, buffer, buffer.byteLength);
+    if (count > 0) {
+      io.messageCount += count;
+    }
+  }
+}
+
 class Process extends BaseProcess {
   /**
    * Each Process object opens an additional pipe from the target object, which
    * will be automatically closed when the process exits, but otherwise
    * carries no data.
    *
    * This property contains a bit mask of poll() events which we wish to be
    * notified of on this descriptor. We're not expecting any input from this
@@ -444,17 +477,37 @@ class Process extends BaseProcess {
 io = {
   pollFds: null,
   pollHandlers: null,
 
   pipes: new Map(),
 
   processes: new Map(),
 
-  interval: null,
+  messageCount: 0,
+
+  running: true,
+
+  init(details) {
+    this.signal = new Signal(details.signalFd);
+    this.updatePollFds();
+
+    setTimeout(this.loop.bind(this), 0);
+  },
+
+  shutdown() {
+    if (this.running) {
+      this.running = false;
+
+      this.signal.cleanup();
+      this.signal = null;
+
+      self.close();
+    }
+  },
 
   getPipe(pipeId) {
     let pipe = this.pipes.get(pipeId);
 
     if (!pipe) {
       let error = new Error("File closed");
       error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
       throw error;
@@ -467,47 +520,49 @@ io = {
 
     if (!process) {
       throw new Error(`Invalid process ID: ${processId}`);
     }
     return process;
   },
 
   updatePollFds() {
-    let handlers = [...this.pipes.values(),
+    let handlers = [this.signal,
+                    ...this.pipes.values(),
                     ...this.processes.values()];
 
     handlers = handlers.filter(handler => handler.pollEvents);
 
     let pollfds = unix.pollfd.array(handlers.length)();
 
     for (let [i, handler] of handlers.entries()) {
       let pollfd = pollfds[i];
 
       pollfd.fd = handler.fd;
       pollfd.events = handler.pollEvents;
       pollfd.revents = 0;
     }
 
     this.pollFds = pollfds;
     this.pollHandlers = handlers;
+  },
 
-    if (pollfds.length && !this.interval) {
-      this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
-    } else if (!pollfds.length && this.interval) {
-      clearInterval(this.interval);
-      this.interval = null;
+  loop() {
+    this.poll();
+    if (this.running) {
+      setTimeout(this.loop.bind(this), 0);
     }
   },
 
   poll() {
     let handlers = this.pollHandlers;
     let pollfds = this.pollFds;
 
-    let count = libc.poll(pollfds, pollfds.length, POLL_TIMEOUT);
+    let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+    let count = libc.poll(pollfds, pollfds.length, timeout);
 
     for (let i = 0; count && i < pollfds.length; i++) {
       let pollfd = pollfds[i];
       if (pollfd.revents) {
         count--;
 
         let handler = handlers[i];
         try {
--- a/toolkit/modules/subprocess/subprocess_worker_win.js
+++ b/toolkit/modules/subprocess/subprocess_worker_win.js
@@ -7,18 +7,17 @@
 
 /* exported Process */
 /* globals BaseProcess, BasePipe, win32 */
 
 importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
               "resource://gre/modules/subprocess/subprocess_shared_win.js",
               "resource://gre/modules/subprocess/subprocess_worker_common.js");
 
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
 
 // The exit code that we send when we forcibly terminate a process.
 const TERMINATE_EXIT_CODE = 0x7f;
 
 let io;
 
 let nextPipeId = 0;
 
@@ -293,16 +292,35 @@ class OutputPipe extends Pipe {
         this.writeNext();
       } else {
         io.updatePollEvents();
       }
     }
   }
 }
 
+class Signal {
+  constructor(event) {
+    this.event = event;
+  }
+
+  cleanup() {
+    libc.CloseHandle(this.event);
+    this.event = null;
+  }
+
+  onError() {
+    io.shutdown();
+  }
+
+  onReady() {
+    io.messageCount += 1;
+  }
+}
+
 class Process extends BaseProcess {
   constructor(...args) {
     super(...args);
 
     this.killed = false;
   }
 
   /**
@@ -538,17 +556,39 @@ class Process extends BaseProcess {
 io = {
   events: null,
   eventHandlers: null,
 
   pipes: new Map(),
 
   processes: new Map(),
 
-  interval: null,
+  messageCount: 0,
+
+  running: true,
+
+  init(details) {
+    let signalEvent = ctypes.cast(ctypes.uintptr_t(details.signalEvent),
+                                  win32.HANDLE);
+    this.signal = new Signal(signalEvent);
+    this.updatePollEvents();
+
+    setTimeout(this.loop.bind(this), 0);
+  },
+
+  shutdown() {
+    if (this.running) {
+      this.running = false;
+
+      this.signal.cleanup();
+      this.signal = null;
+
+      self.close();
+    }
+  },
 
   getPipe(pipeId) {
     let pipe = this.pipes.get(pipeId);
 
     if (!pipe) {
       let error = new Error("File closed");
       error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
       throw error;
@@ -561,41 +601,44 @@ io = {
 
     if (!process) {
       throw new Error(`Invalid process ID: ${processId}`);
     }
     return process;
   },
 
   updatePollEvents() {
-    let handlers = [...this.pipes.values(),
+    let handlers = [this.signal,
+                    ...this.pipes.values(),
                     ...this.processes.values()];
 
     handlers = handlers.filter(handler => handler.event);
 
     this.eventHandlers = handlers;
 
     let handles = handlers.map(handler => handler.event);
     this.events = win32.HANDLE.array()(handles);
+  },
 
-    if (handles.length && !this.interval) {
-      this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
-    } else if (!handlers.length && this.interval) {
-      clearInterval(this.interval);
-      this.interval = null;
+  loop() {
+    this.poll();
+    if (this.running) {
+      setTimeout(this.loop.bind(this), 0);
     }
   },
 
+
   poll() {
-    for (;;) {
+    let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+    for (;; timeout = 0) {
       let events = this.events;
       let handlers = this.eventHandlers;
 
       let result = libc.WaitForMultipleObjects(events.length, events,
-                                               false, POLL_TIMEOUT);
+                                               false, timeout);
 
       if (result < handlers.length) {
         try {
           handlers[result].onReady();
         } catch (e) {
           console.error(e);
           debug(`Worker error: ${e} :: ${e.stack}`);
           handlers[result].onError();
--- a/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
+++ b/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
@@ -1,16 +1,19 @@
 "use strict";
 
+Cu.import("resource://gre/modules/AppConstants.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/Timer.jsm");
 
 
 const env = Cc["@mozilla.org/process/environment;1"].getService(Ci.nsIEnvironment);
 
+const MAX_ROUND_TRIP_TIME_MS = AppConstants.DEBUG ? 8 : 2;
+
 let PYTHON;
 let PYTHON_BIN;
 let PYTHON_DIR;
 
 const TEST_SCRIPT = do_get_file("data_test_script.py").path;
 
 let read = pipe => {
   return pipe.readUint32().then(count => {
@@ -173,16 +176,52 @@ add_task(function* test_subprocess_huge(
 
 
   let {exitCode} = yield proc.wait();
 
   equal(exitCode, 0, "Got expected exit code");
 });
 
 
+add_task(function* test_subprocess_round_trip_perf() {
+  let proc = yield Subprocess.call({
+    command: PYTHON,
+    arguments: ["-u", TEST_SCRIPT, "echo"],
+  });
+
+
+  const LINE = "I'm a leaf on the wind.\n";
+
+  let now = Date.now();
+  const COUNT = 1000;
+  for (let i = 0; i < COUNT; i++) {
+    let [output] = yield Promise.all([
+      read(proc.stdout),
+      proc.stdin.write(LINE),
+    ]);
+
+    // We don't want to log this for every iteration, but we still need
+    // to fail if it goes wrong.
+    if (output !== LINE) {
+      equal(output, LINE, "Got expected output");
+    }
+  }
+
+  let roundTripTime = (Date.now() - now) / COUNT;
+  ok(roundTripTime <= MAX_ROUND_TRIP_TIME_MS,
+     `Expected round trip time (${roundTripTime}ms) to be less than ${MAX_ROUND_TRIP_TIME_MS}ms`);
+
+  yield proc.stdin.close();
+
+  let {exitCode} = yield proc.wait();
+
+  equal(exitCode, 0, "Got expected exit code");
+});
+
+
 add_task(function* test_subprocess_stderr_default() {
   const LINE1 = "I'm a leaf on the wind.\n";
   const LINE2 = "Watch how I soar.\n";
 
   let proc = yield Subprocess.call({
     command: PYTHON,
     arguments: ["-u", TEST_SCRIPT, "print", LINE1, LINE2],
   });