Bug 1416313 - Drop old or low priority commands rather than failing to sync the clients engine if too many commands are sent. r?eoger,markh
MozReview-Commit-ID: 6BJGmUL28hp
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -643,27 +643,28 @@ ClientEngine.prototype = {
// Neither try again nor error; we're going to delete it.
return SyncEngine.kRecoveryStrategy.ignore;
},
/**
* A hash of valid commands that the client knows about. The key is a command
* and the value is a hash containing information about the command such as
- * number of arguments and description.
+ * number of arguments, description, and importance (lower importance numbers
+ * indicate higher importance.
*/
_commands: {
- resetAll: { args: 0, desc: "Clear temporary local data for all engines" },
- resetEngine: { args: 1, desc: "Clear temporary local data for engine" },
- wipeAll: { args: 0, desc: "Delete all client data for all engines" },
- wipeEngine: { args: 1, desc: "Delete all client data for engine" },
- logout: { args: 0, desc: "Log out client" },
- displayURI: { args: 3, desc: "Instruct a client to display a URI" },
- repairRequest: {args: 1, desc: "Instruct a client to initiate a repair"},
- repairResponse: {args: 1, desc: "Instruct a client a repair request is complete"},
+ resetAll: { args: 0, importance: 0, desc: "Clear temporary local data for all engines" },
+ resetEngine: { args: 1, importance: 0, desc: "Clear temporary local data for engine" },
+ wipeAll: { args: 0, importance: 0, desc: "Delete all client data for all engines" },
+ wipeEngine: { args: 1, importance: 0, desc: "Delete all client data for engine" },
+ logout: { args: 0, importance: 0, desc: "Log out client" },
+ displayURI: { args: 3, importance: 1, desc: "Instruct a client to display a URI" },
+ repairRequest: { args: 1, importance: 2, desc: "Instruct a client to initiate a repair" },
+ repairResponse: { args: 1, importance: 2, desc: "Instruct a client a repair request is complete" },
},
/**
* Sends a command+args pair to a specific client.
*
* @param command Command string
* @param args Array of arguments/data for command
* @param clientId Client to send command to
@@ -878,17 +879,17 @@ ClientEngine.prototype = {
* URI (as a string) to send and display on the remote client
* @param clientId
* ID of client to send the command to. If not defined, will be sent
* to all remote clients.
* @param title
* Title of the page being sent.
*/
async sendURIToClientForDisplay(uri, clientId, title) {
- this._log.info("Sending URI to client: " + uri + " -> " +
+ this._log.trace("Sending URI to client: " + uri + " -> " +
clientId + " (" + title + ")");
await this.sendCommand("displayURI", [uri, this.localID, title], clientId);
this._tracker.score += SCORE_INCREMENT_XLARGE;
},
/**
* Handle a bunch of received 'displayURI' commands.
@@ -992,17 +993,49 @@ ClientStore.prototype = {
if (record.cleartext.stale) {
// It's almost certainly a logic error for us to upload a record we
// consider stale, so make log noise, but still remove the flag.
this._log.error(`Preparing to upload record ${id} that we consider stale`);
delete record.cleartext.stale;
}
}
-
+ if (record.commands) {
+ const maxPayloadSize = this.engine.service.getMemcacheMaxRecordPayloadSize();
+ let origOrder = new Map(record.commands.map((c, i) => [c, i]));
+ // we sort first by priority, and second by age (indicated by order in the
+ // original list)
+ let commands = record.commands.slice().sort((a, b) => {
+ let infoA = this.engine._commands[a.command];
+ let infoB = this.engine._commands[b.command];
+ // Treat unknown command types as highest priority, to allow us to add
+ // high priority commands in the future without worrying about clients
+ // removing them on each-other unnecessarially.
+ let importA = infoA ? infoA.importance : 0;
+ let importB = infoB ? infoB.importance : 0;
+ // Higher importantance numbers indicate that we care less, so they
+ // go to the end of the list where they'll be popped off.
+ let importDelta = importA - importB;
+ if (importDelta != 0) {
+ return importDelta;
+ }
+ let origIdxA = origOrder.get(a);
+ let origIdxB = origOrder.get(b);
+ // Within equivalent priorities, we put older entries near the end
+ // of the list, so that they are removed first.
+ return origIdxB - origIdxA;
+ });
+ let truncatedCommands = Utils.tryFitItems(commands, maxPayloadSize);
+ if (truncatedCommands.length != record.commands.length) {
+ this._log.warn(`Removing commands from client ${id} (from ${record.commands.length} to ${truncatedCommands.length})`);
+ // Restore original order.
+ record.commands = truncatedCommands.sort((a, b) =>
+ origOrder.get(a) - origOrder.get(b));
+ }
+ }
return record;
},
async itemExists(id) {
return id in (await this.getAllIDs());
},
async getAllIDs() {
--- a/services/sync/modules/engines/tabs.js
+++ b/services/sync/modules/engines/tabs.js
@@ -6,16 +6,17 @@ this.EXPORTED_SYMBOLS = ["TabEngine", "T
var {classes: Cc, interfaces: Ci, utils: Cu} = Components;
const TABS_TTL = 1814400; // 21 days.
const TAB_ENTRIES_LIMIT = 5; // How many URLs to include in tab history.
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
Cu.import("resource://gre/modules/Services.jsm");
+Cu.import("resource://gre/modules/Log.jsm");
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/record.js");
Cu.import("resource://services-sync/util.js");
Cu.import("resource://services-sync/constants.js");
XPCOMUtils.defineLazyModuleGetter(this, "PrivateBrowsingUtils",
"resource://gre/modules/PrivateBrowsingUtils.jsm");
XPCOMUtils.defineLazyModuleGetter(this, "SessionStore",
@@ -200,55 +201,39 @@ TabStore.prototype = {
lastUsed: Math.floor((tabState.lastAccessed || 0) / 1000),
});
}
}
return allTabs;
},
- getMaxRecordPayloadSize() {
- // Tabs have a different max size due to being stored using memcached on the
- // server (See bug 1403052), but we still check the server config to make
- // sure we respect the global limits it sets.
- return Math.min(512 * 1024, this.engine.service.getMaxRecordPayloadSize());
- },
-
async createRecord(id, collection) {
let record = new TabSetRecord(collection, id);
record.clientName = this.engine.service.clientsEngine.localName;
// Sort tabs in descending-used order to grab the most recently used
let tabs = this.getAllTabs(true).sort(function(a, b) {
return b.lastUsed - a.lastUsed;
});
- let encoder = new TextEncoder("utf-8");
- // Figure out how many tabs we can pack into a payload.
- // We use byteLength here because the data is not encrypted in ascii yet.
- let size = encoder.encode(JSON.stringify(tabs)).byteLength;
- let origLength = tabs.length;
- const maxPayloadSize = this.getMaxRecordPayloadSize();
- // See bug 535326 comment 8 for an explanation of the estimation
- const MAX_TAB_SIZE = maxPayloadSize / 4 * 3 - 1500;
- if (size > MAX_TAB_SIZE) {
- // Estimate a little more than the direct fraction to maximize packing
- let cutoff = Math.ceil(tabs.length * MAX_TAB_SIZE / size);
- tabs = tabs.slice(0, cutoff + 1);
+ const maxPayloadSize = this.engine.service.getMemcacheMaxRecordPayloadSize();
+ let records = Utils.tryFitItems(tabs, maxPayloadSize);
- // Keep dropping off the last entry until the data fits
- while (encoder.encode(JSON.stringify(tabs)).byteLength > MAX_TAB_SIZE)
- tabs.pop();
+ if (records.length != tabs.length) {
+ this._log.warn(`Can't fit all tabs in sync payload: have ${
+ tabs.length}, but can only fit ${records.length}.`);
}
- this._log.trace("Created tabs " + tabs.length + " of " + origLength);
- tabs.forEach(function(tab) {
- this._log.trace("Wrapping tab: " + JSON.stringify(tab));
- }, this);
+ if (this._log.level <= Log.Level.Trace) {
+ records.forEach(tab => {
+ this._log.trace("Wrapping tab: ", tab);
+ });
+ }
- record.tabs = tabs;
+ record.tabs = records;
return record;
},
async getAllIDs() {
// Don't report any tabs if all windows are in private browsing for
// first syncs.
let ids = {};
let allWindowsArePrivate = false;
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -611,16 +611,27 @@ Sync11Service.prototype = {
}
let payloadMax = config.max_record_payload_bytes;
if (config.max_post_bytes && payloadMax <= config.max_post_bytes) {
return config.max_post_bytes - 4096;
}
return payloadMax;
},
+ getMemcacheMaxRecordPayloadSize() {
+ // Collections stored in memcached ("tabs", "clients" or "meta") have a
+ // different max size than ones stored in the normal storage server db.
+ // In practice, the real limit here is 1M (bug 1300451 comment 40), but
+ // there's overhead involved that is hard to calculate on the client, so we
+ // use 512k to be safe (at the recommendation of the server team). Note
+ // that if the server reports a lower limit (via info/configuration), we
+ // respect that limit instead. See also bug 1403052.
+ return Math.min(512 * 1024, this.getMaxRecordPayloadSize());
+ },
+
async verifyLogin(allow40XRecovery = true) {
if (!this.identity.username) {
this._log.warn("No username in verifyLogin.");
this.status.login = LOGIN_FAILED_NO_USERNAME;
return false;
}
// Attaching auth credentials to a request requires access to
--- a/services/sync/modules/util.js
+++ b/services/sync/modules/util.js
@@ -363,16 +363,52 @@ this.Utils = {
}
let json = typeof obj == "function" ? obj.call(that) : obj;
return CommonUtils.writeJSON(json, path);
},
/**
+ * Helper utility function to fit an array of records so that when serialized,
+ * they will be within payloadSizeMaxBytes. Returns a new array without the
+ * items.
+ */
+ tryFitItems(records, payloadSizeMaxBytes) {
+ // Copy this so that callers don't have to do it in advance.
+ records = records.slice();
+ let encoder = Utils.utf8Encoder;
+ const computeSerializedSize = () =>
+ encoder.encode(JSON.stringify(records)).byteLength;
+ // Figure out how many records we can pack into a payload.
+ // We use byteLength here because the data is not encrypted in ascii yet.
+ let size = computeSerializedSize();
+ // See bug 535326 comment 8 for an explanation of the estimation
+ const maxSerializedSize = payloadSizeMaxBytes / 4 * 3 - 1500;
+ if (maxSerializedSize < 0) {
+ // This is probably due to a test, but it causes very bad behavior if a
+ // test causes this accidentally. We could throw, but there's an obvious/
+ // natural way to handle it, so we do that instead (otherwise we'd have a
+ // weird lower bound of ~1125b on the max record payload size).
+ return [];
+ }
+ if (size > maxSerializedSize) {
+ // Estimate a little more than the direct fraction to maximize packing
+ let cutoff = Math.ceil(records.length * maxSerializedSize / size);
+ records = records.slice(0, cutoff + 1);
+
+ // Keep dropping off the last entry until the data fits.
+ while (computeSerializedSize() > maxSerializedSize) {
+ records.pop();
+ }
+ }
+ return records;
+ },
+
+ /**
* Move a json file in the profile directory. Will fail if a file exists at the
* destination.
*
* @returns a promise that resolves to undefined on success, or rejects on failure
*
* @param aFrom
* Current path to the JSON file saved on disk, relative to profileDir/weave
* .json will be appended to the file name.
@@ -618,16 +654,19 @@ this.Utils = {
XPCOMUtils.defineLazyGetter(Utils, "_utf8Converter", function() {
let converter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
.createInstance(Ci.nsIScriptableUnicodeConverter);
converter.charset = "UTF-8";
return converter;
});
+XPCOMUtils.defineLazyGetter(Utils, "utf8Encoder", () =>
+ new TextEncoder("utf-8"));
+
/*
* Commonly-used services
*/
this.Svc = {};
Svc.Prefs = new Preferences(PREFS_BRANCH);
Svc.Obs = Observers;
Svc.Obs.add("xpcom-shutdown", function() {
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -1807,13 +1807,94 @@ add_task(async function process_incoming
// Let's say we failed to upload and we end up calling processIncomingCommands again
await engine.processIncomingCommands();
ok(tabProcessedSpy.calledOnce);
tabProcessedSpy.restore();
stubRemoveLocalCommand.restore();
});
+add_task(async function test_create_record_command_limit() {
+ await engine._store.wipe();
+ await generateNewKeys(Service.collectionKeys);
+
+ let server = await serverForFoo(engine);
+ await SyncTestingInfrastructure(server);
+
+ const fakeLimit = 4 * 1024;
+
+ let maxSizeStub = sinon.stub(Service,
+ "getMemcacheMaxRecordPayloadSize", () => fakeLimit);
+
+ let user = server.user("foo");
+ let remoteId = Utils.makeGUID();
+
+ _("Create remote client record");
+ server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
+ id: remoteId,
+ name: "Remote client",
+ type: "desktop",
+ commands: [],
+ version: "57",
+ protocols: ["1.5"],
+ }), Date.now() / 1000));
+
+ try {
+ _("Initial sync.");
+ await syncClientsEngine(server);
+
+ _("Send a fairly sane number of commands.");
+
+ for (let i = 0; i < 5; ++i) {
+ await engine.sendURIToClientForDisplay(
+ `https://www.example.com/1/${i}`, remoteId, `Page 1.${i}`);
+ }
+
+ await syncClientsEngine(server);
+
+ _("Make sure they all fit and weren't dropped.");
+ let parsedServerRecord = JSON.parse(JSON.parse(
+ user.collection("clients").payload(remoteId)).ciphertext);
+
+ equal(parsedServerRecord.commands.length, 5);
+
+ await engine.sendCommand("wipeEngine", ["history"], remoteId);
+
+ _("Send a not-sane number of commands.");
+ // Much higher than the maximum number of commands we could actually fit.
+ for (let i = 0; i < 500; ++i) {
+ await engine.sendURIToClientForDisplay(
+ `https://www.example.com/2/${i}`, remoteId, `Page 2.${i}`)
+ }
+
+ await syncClientsEngine(server);
+
+ _("Ensure we didn't overflow the server limit.");
+ let payload = user.collection("clients").payload(remoteId);
+ less(payload.length, fakeLimit);
+
+ _("And that the data we uploaded is both sane json and containing some commands.")
+ let remoteCommands = JSON.parse(JSON.parse(payload).ciphertext).commands;
+ greater(remoteCommands.length, 2);
+ let firstCommand = remoteCommands[0];
+ _("The first command should still be present, since it had a high priority")
+ equal(firstCommand.command, "wipeEngine");
+ _("And the last command in the list should be the last command we sent.");
+ let lastCommand = remoteCommands[remoteCommands.length - 1];
+ equal(lastCommand.command, "displayURI");
+ deepEqual(lastCommand.args, ["https://www.example.com/2/499", engine.localID, "Page 2.499"]);
+ } finally {
+ maxSizeStub.restore();
+ await cleanup();
+ try {
+ let collection = server.getCollection("foo", "clients");
+ collection.remove(remoteId);
+ } finally {
+ await promiseStopServer(server);
+ }
+ }
+});
+
function run_test() {
initTestLogging("Trace");
Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
run_next_test();
}
--- a/services/sync/tests/unit/test_tab_store.js
+++ b/services/sync/tests/unit/test_tab_store.js
@@ -104,16 +104,20 @@ add_task(async function test_createRecor
equal(record.tabs.length, 1);
_("create a big record");
store.getWindowEnumerator = mockGetWindowEnumerator.bind(this, "http://foo.com", 1, numtabs);
record = await store.createRecord("fake-guid");
ok(record instanceof TabSetRecord);
equal(record.tabs.length, 2501);
- store.getMaxRecordPayloadSize = () => 512 * 1024;
- numtabs = 5200;
- _("Modify the max record payload size and create a big record");
- store.getWindowEnumerator = mockGetWindowEnumerator.bind(this, "http://foo.com", 1, numtabs);
- record = await store.createRecord("fake-guid");
- ok(record instanceof TabSetRecord);
- equal(record.tabs.length, 5021);
+ let maxSizeStub = sinon.stub(Service, "getMemcacheMaxRecordPayloadSize", () => 512 * 1024);
+ try {
+ numtabs = 5200;
+ _("Modify the max record payload size and create a big record");
+ store.getWindowEnumerator = mockGetWindowEnumerator.bind(this, "http://foo.com", 1, numtabs);
+ record = await store.createRecord("fake-guid");
+ ok(record instanceof TabSetRecord);
+ equal(record.tabs.length, 5021);
+ } finally {
+ maxSizeStub.restore();
+ }
});