Bug 1416313 - Drop old or low priority commands rather than failing to sync the clients engine if too many commands are sent. r?eoger,markh draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Fri, 10 Nov 2017 15:57:33 -0500
changeset 697837 f8d6bbb2b739bf2c135510b18328dafddc0d6e6e
parent 697757 b0a6b4678b2f7dfb499328946b95366775f71edd
child 740223 212dcae60adee180e3b0c4a571df5351374fe9a8
push id89113
push userbmo:tchiovoloni@mozilla.com
push dateTue, 14 Nov 2017 19:28:04 +0000
reviewerseoger, markh
bugs1416313
milestone59.0a1
Bug 1416313 - Drop old or low priority commands rather than failing to sync the clients engine if too many commands are sent. r?eoger,markh MozReview-Commit-ID: 6BJGmUL28hp
services/sync/modules/engines/clients.js
services/sync/modules/engines/tabs.js
services/sync/modules/service.js
services/sync/modules/util.js
services/sync/tests/unit/test_clients_engine.js
services/sync/tests/unit/test_tab_store.js
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -643,27 +643,28 @@ ClientEngine.prototype = {
 
     // Neither try again nor error; we're going to delete it.
     return SyncEngine.kRecoveryStrategy.ignore;
   },
 
   /**
    * A hash of valid commands that the client knows about. The key is a command
    * and the value is a hash containing information about the command such as
-   * number of arguments and description.
+   * number of arguments, description, and importance (lower importance numbers
+   * indicate higher importance.
    */
   _commands: {
-    resetAll:    { args: 0, desc: "Clear temporary local data for all engines" },
-    resetEngine: { args: 1, desc: "Clear temporary local data for engine" },
-    wipeAll:     { args: 0, desc: "Delete all client data for all engines" },
-    wipeEngine:  { args: 1, desc: "Delete all client data for engine" },
-    logout:      { args: 0, desc: "Log out client" },
-    displayURI:  { args: 3, desc: "Instruct a client to display a URI" },
-    repairRequest:  {args: 1, desc: "Instruct a client to initiate a repair"},
-    repairResponse: {args: 1, desc: "Instruct a client a repair request is complete"},
+    resetAll:    { args: 0, importance: 0, desc: "Clear temporary local data for all engines" },
+    resetEngine: { args: 1, importance: 0, desc: "Clear temporary local data for engine" },
+    wipeAll:     { args: 0, importance: 0, desc: "Delete all client data for all engines" },
+    wipeEngine:  { args: 1, importance: 0, desc: "Delete all client data for engine" },
+    logout:      { args: 0, importance: 0, desc: "Log out client" },
+    displayURI:  { args: 3, importance: 1, desc: "Instruct a client to display a URI" },
+    repairRequest:  { args: 1, importance: 2, desc: "Instruct a client to initiate a repair" },
+    repairResponse: { args: 1, importance: 2, desc: "Instruct a client a repair request is complete" },
   },
 
   /**
    * Sends a command+args pair to a specific client.
    *
    * @param command Command string
    * @param args Array of arguments/data for command
    * @param clientId Client to send command to
@@ -878,17 +879,17 @@ ClientEngine.prototype = {
    *        URI (as a string) to send and display on the remote client
    * @param clientId
    *        ID of client to send the command to. If not defined, will be sent
    *        to all remote clients.
    * @param title
    *        Title of the page being sent.
    */
   async sendURIToClientForDisplay(uri, clientId, title) {
-    this._log.info("Sending URI to client: " + uri + " -> " +
+    this._log.trace("Sending URI to client: " + uri + " -> " +
                    clientId + " (" + title + ")");
     await this.sendCommand("displayURI", [uri, this.localID, title], clientId);
 
     this._tracker.score += SCORE_INCREMENT_XLARGE;
   },
 
   /**
    * Handle a bunch of received 'displayURI' commands.
@@ -992,17 +993,49 @@ ClientStore.prototype = {
 
       if (record.cleartext.stale) {
         // It's almost certainly a logic error for us to upload a record we
         // consider stale, so make log noise, but still remove the flag.
         this._log.error(`Preparing to upload record ${id} that we consider stale`);
         delete record.cleartext.stale;
       }
     }
-
+    if (record.commands) {
+      const maxPayloadSize = this.engine.service.getMemcacheMaxRecordPayloadSize();
+      let origOrder = new Map(record.commands.map((c, i) => [c, i]));
+      // we sort first by priority, and second by age (indicated by order in the
+      // original list)
+      let commands = record.commands.slice().sort((a, b) => {
+        let infoA = this.engine._commands[a.command];
+        let infoB = this.engine._commands[b.command];
+        // Treat unknown command types as highest priority, to allow us to add
+        // high priority commands in the future without worrying about clients
+        // removing them on each-other unnecessarially.
+        let importA = infoA ? infoA.importance : 0;
+        let importB = infoB ? infoB.importance : 0;
+        // Higher importantance numbers indicate that we care less, so they
+        // go to the end of the list where they'll be popped off.
+        let importDelta = importA - importB;
+        if (importDelta != 0) {
+          return importDelta;
+        }
+        let origIdxA = origOrder.get(a);
+        let origIdxB = origOrder.get(b);
+        // Within equivalent priorities, we put older entries near the end
+        // of the list, so that they are removed first.
+        return origIdxB - origIdxA;
+      });
+      let truncatedCommands = Utils.tryFitItems(commands, maxPayloadSize);
+      if (truncatedCommands.length != record.commands.length) {
+        this._log.warn(`Removing commands from client ${id} (from ${record.commands.length} to ${truncatedCommands.length})`);
+        // Restore original order.
+        record.commands = truncatedCommands.sort((a, b) =>
+          origOrder.get(a) - origOrder.get(b));
+      }
+    }
     return record;
   },
 
   async itemExists(id) {
     return id in (await this.getAllIDs());
   },
 
   async getAllIDs() {
--- a/services/sync/modules/engines/tabs.js
+++ b/services/sync/modules/engines/tabs.js
@@ -6,16 +6,17 @@ this.EXPORTED_SYMBOLS = ["TabEngine", "T
 
 var {classes: Cc, interfaces: Ci, utils: Cu} = Components;
 
 const TABS_TTL = 1814400; // 21 days.
 const TAB_ENTRIES_LIMIT = 5; // How many URLs to include in tab history.
 
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
+Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://services-sync/constants.js");
 
 XPCOMUtils.defineLazyModuleGetter(this, "PrivateBrowsingUtils",
   "resource://gre/modules/PrivateBrowsingUtils.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "SessionStore",
@@ -200,55 +201,39 @@ TabStore.prototype = {
           lastUsed: Math.floor((tabState.lastAccessed || 0) / 1000),
         });
       }
     }
 
     return allTabs;
   },
 
-  getMaxRecordPayloadSize() {
-    // Tabs have a different max size due to being stored using memcached on the
-    // server (See bug 1403052), but we still check the server config to make
-    // sure we respect the global limits it sets.
-    return Math.min(512 * 1024, this.engine.service.getMaxRecordPayloadSize());
-  },
-
   async createRecord(id, collection) {
     let record = new TabSetRecord(collection, id);
     record.clientName = this.engine.service.clientsEngine.localName;
 
     // Sort tabs in descending-used order to grab the most recently used
     let tabs = this.getAllTabs(true).sort(function(a, b) {
       return b.lastUsed - a.lastUsed;
     });
-    let encoder = new TextEncoder("utf-8");
-    // Figure out how many tabs we can pack into a payload.
-    // We use byteLength here because the data is not encrypted in ascii yet.
-    let size = encoder.encode(JSON.stringify(tabs)).byteLength;
-    let origLength = tabs.length;
-    const maxPayloadSize = this.getMaxRecordPayloadSize();
-    // See bug 535326 comment 8 for an explanation of the estimation
-    const MAX_TAB_SIZE = maxPayloadSize / 4 * 3 - 1500;
-    if (size > MAX_TAB_SIZE) {
-      // Estimate a little more than the direct fraction to maximize packing
-      let cutoff = Math.ceil(tabs.length * MAX_TAB_SIZE / size);
-      tabs = tabs.slice(0, cutoff + 1);
+    const maxPayloadSize = this.engine.service.getMemcacheMaxRecordPayloadSize();
+    let records = Utils.tryFitItems(tabs, maxPayloadSize);
 
-      // Keep dropping off the last entry until the data fits
-      while (encoder.encode(JSON.stringify(tabs)).byteLength > MAX_TAB_SIZE)
-        tabs.pop();
+    if (records.length != tabs.length) {
+      this._log.warn(`Can't fit all tabs in sync payload: have ${
+                     tabs.length}, but can only fit ${records.length}.`);
     }
 
-    this._log.trace("Created tabs " + tabs.length + " of " + origLength);
-    tabs.forEach(function(tab) {
-      this._log.trace("Wrapping tab: " + JSON.stringify(tab));
-    }, this);
+    if (this._log.level <= Log.Level.Trace) {
+      records.forEach(tab => {
+        this._log.trace("Wrapping tab: ", tab);
+      });
+    }
 
-    record.tabs = tabs;
+    record.tabs = records;
     return record;
   },
 
   async getAllIDs() {
     // Don't report any tabs if all windows are in private browsing for
     // first syncs.
     let ids = {};
     let allWindowsArePrivate = false;
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -611,16 +611,27 @@ Sync11Service.prototype = {
     }
     let payloadMax = config.max_record_payload_bytes;
     if (config.max_post_bytes && payloadMax <= config.max_post_bytes) {
       return config.max_post_bytes - 4096;
     }
     return payloadMax;
   },
 
+  getMemcacheMaxRecordPayloadSize() {
+    // Collections stored in memcached ("tabs", "clients" or "meta") have a
+    // different max size than ones stored in the normal storage server db.
+    // In practice, the real limit here is 1M (bug 1300451 comment 40), but
+    // there's overhead involved that is hard to calculate on the client, so we
+    // use 512k to be safe (at the recommendation of the server team). Note
+    // that if the server reports a lower limit (via info/configuration), we
+    // respect that limit instead. See also bug 1403052.
+    return Math.min(512 * 1024, this.getMaxRecordPayloadSize());
+  },
+
   async verifyLogin(allow40XRecovery = true) {
     if (!this.identity.username) {
       this._log.warn("No username in verifyLogin.");
       this.status.login = LOGIN_FAILED_NO_USERNAME;
       return false;
     }
 
     // Attaching auth credentials to a request requires access to
--- a/services/sync/modules/util.js
+++ b/services/sync/modules/util.js
@@ -363,16 +363,52 @@ this.Utils = {
     }
 
     let json = typeof obj == "function" ? obj.call(that) : obj;
 
     return CommonUtils.writeJSON(json, path);
   },
 
   /**
+   * Helper utility function to fit an array of records so that when serialized,
+   * they will be within payloadSizeMaxBytes. Returns a new array without the
+   * items.
+   */
+  tryFitItems(records, payloadSizeMaxBytes) {
+    // Copy this so that callers don't have to do it in advance.
+    records = records.slice();
+    let encoder = Utils.utf8Encoder;
+    const computeSerializedSize = () =>
+      encoder.encode(JSON.stringify(records)).byteLength;
+    // Figure out how many records we can pack into a payload.
+    // We use byteLength here because the data is not encrypted in ascii yet.
+    let size = computeSerializedSize();
+    // See bug 535326 comment 8 for an explanation of the estimation
+    const maxSerializedSize = payloadSizeMaxBytes / 4 * 3 - 1500;
+    if (maxSerializedSize < 0) {
+      // This is probably due to a test, but it causes very bad behavior if a
+      // test causes this accidentally. We could throw, but there's an obvious/
+      // natural way to handle it, so we do that instead (otherwise we'd have a
+      // weird lower bound of ~1125b on the max record payload size).
+      return [];
+    }
+    if (size > maxSerializedSize) {
+      // Estimate a little more than the direct fraction to maximize packing
+      let cutoff = Math.ceil(records.length * maxSerializedSize / size);
+      records = records.slice(0, cutoff + 1);
+
+      // Keep dropping off the last entry until the data fits.
+      while (computeSerializedSize() > maxSerializedSize) {
+        records.pop();
+      }
+    }
+    return records;
+  },
+
+  /**
    * Move a json file in the profile directory. Will fail if a file exists at the
    * destination.
    *
    * @returns a promise that resolves to undefined on success, or rejects on failure
    *
    * @param aFrom
    *        Current path to the JSON file saved on disk, relative to profileDir/weave
    *        .json will be appended to the file name.
@@ -618,16 +654,19 @@ this.Utils = {
 
 XPCOMUtils.defineLazyGetter(Utils, "_utf8Converter", function() {
   let converter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
                     .createInstance(Ci.nsIScriptableUnicodeConverter);
   converter.charset = "UTF-8";
   return converter;
 });
 
+XPCOMUtils.defineLazyGetter(Utils, "utf8Encoder", () =>
+  new TextEncoder("utf-8"));
+
 /*
  * Commonly-used services
  */
 this.Svc = {};
 Svc.Prefs = new Preferences(PREFS_BRANCH);
 Svc.Obs = Observers;
 
 Svc.Obs.add("xpcom-shutdown", function() {
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -1807,13 +1807,94 @@ add_task(async function process_incoming
   // Let's say we failed to upload and we end up calling processIncomingCommands again
   await engine.processIncomingCommands();
   ok(tabProcessedSpy.calledOnce);
 
   tabProcessedSpy.restore();
   stubRemoveLocalCommand.restore();
 });
 
+add_task(async function test_create_record_command_limit() {
+  await engine._store.wipe();
+  await generateNewKeys(Service.collectionKeys);
+
+  let server = await serverForFoo(engine);
+  await SyncTestingInfrastructure(server);
+
+  const fakeLimit = 4 * 1024;
+
+  let maxSizeStub = sinon.stub(Service,
+    "getMemcacheMaxRecordPayloadSize", () => fakeLimit);
+
+  let user = server.user("foo");
+  let remoteId = Utils.makeGUID();
+
+  _("Create remote client record");
+  server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
+    id: remoteId,
+    name: "Remote client",
+    type: "desktop",
+    commands: [],
+    version: "57",
+    protocols: ["1.5"],
+  }), Date.now() / 1000));
+
+  try {
+    _("Initial sync.");
+    await syncClientsEngine(server);
+
+    _("Send a fairly sane number of commands.");
+
+    for (let i = 0; i < 5; ++i) {
+      await engine.sendURIToClientForDisplay(
+        `https://www.example.com/1/${i}`, remoteId, `Page 1.${i}`);
+    }
+
+    await syncClientsEngine(server);
+
+    _("Make sure they all fit and weren't dropped.");
+    let parsedServerRecord = JSON.parse(JSON.parse(
+      user.collection("clients").payload(remoteId)).ciphertext);
+
+    equal(parsedServerRecord.commands.length, 5);
+
+    await engine.sendCommand("wipeEngine", ["history"], remoteId);
+
+    _("Send a not-sane number of commands.");
+    // Much higher than the maximum number of commands we could actually fit.
+    for (let i = 0; i < 500; ++i) {
+      await engine.sendURIToClientForDisplay(
+        `https://www.example.com/2/${i}`, remoteId, `Page 2.${i}`)
+    }
+
+    await syncClientsEngine(server);
+
+    _("Ensure we didn't overflow the server limit.");
+    let payload = user.collection("clients").payload(remoteId);
+    less(payload.length, fakeLimit);
+
+    _("And that the data we uploaded is both sane json and containing some commands.")
+    let remoteCommands = JSON.parse(JSON.parse(payload).ciphertext).commands;
+    greater(remoteCommands.length, 2);
+    let firstCommand = remoteCommands[0];
+    _("The first command should still be present, since it had a high priority")
+    equal(firstCommand.command, "wipeEngine");
+    _("And the last command in the list should be the last command we sent.");
+    let lastCommand = remoteCommands[remoteCommands.length - 1];
+    equal(lastCommand.command, "displayURI");
+    deepEqual(lastCommand.args, ["https://www.example.com/2/499", engine.localID, "Page 2.499"]);
+  } finally {
+    maxSizeStub.restore();
+    await cleanup();
+    try {
+      let collection = server.getCollection("foo", "clients");
+      collection.remove(remoteId);
+    } finally {
+      await promiseStopServer(server);
+    }
+  }
+});
+
 function run_test() {
   initTestLogging("Trace");
   Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
   run_next_test();
 }
--- a/services/sync/tests/unit/test_tab_store.js
+++ b/services/sync/tests/unit/test_tab_store.js
@@ -104,16 +104,20 @@ add_task(async function test_createRecor
   equal(record.tabs.length, 1);
 
   _("create a big record");
   store.getWindowEnumerator = mockGetWindowEnumerator.bind(this, "http://foo.com", 1, numtabs);
   record = await store.createRecord("fake-guid");
   ok(record instanceof TabSetRecord);
   equal(record.tabs.length, 2501);
 
-  store.getMaxRecordPayloadSize = () => 512 * 1024;
-  numtabs = 5200;
-  _("Modify the max record payload size and create a big record");
-  store.getWindowEnumerator = mockGetWindowEnumerator.bind(this, "http://foo.com", 1, numtabs);
-  record = await store.createRecord("fake-guid");
-  ok(record instanceof TabSetRecord);
-  equal(record.tabs.length, 5021);
+  let maxSizeStub = sinon.stub(Service, "getMemcacheMaxRecordPayloadSize", () => 512 * 1024);
+  try {
+    numtabs = 5200;
+    _("Modify the max record payload size and create a big record");
+    store.getWindowEnumerator = mockGetWindowEnumerator.bind(this, "http://foo.com", 1, numtabs);
+    record = await store.createRecord("fake-guid");
+    ok(record instanceof TabSetRecord);
+    equal(record.tabs.length, 5021);
+  } finally {
+    maxSizeStub.restore();
+  }
 });