Bug 1313967 - Buffer outgoing Sync records in memory. r?markh
The current design of `Engine::_uploadOutgoing` makes it hard to upload
records that aren't in `Engine::_modified`. This commit extracts the
`_createRecord` calls into a `fetchOutgoingRecords` method that returns
an array of encrypted records for upload. Engines can override this
method to include additional records.
This is used in
bug 1303679 to add a cleanup stage that writes
tombstones for records that shouldn't be on the server.
MozReview-Commit-ID: 9iePxJzWl09
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -1486,22 +1486,22 @@ SyncEngine.prototype = {
item.id);
return remoteIsNewer;
},
// Upload outgoing records.
_uploadOutgoing: function () {
this._log.trace("Uploading local changes to server.");
- let modifiedIDs = this._modified.ids();
- if (modifiedIDs.length) {
- this._log.trace("Preparing " + modifiedIDs.length +
+ let records = this.fetchOutgoingRecords();
+ if (records.length) {
+ this._log.trace("Uploading " + records.length +
" outgoing records");
- let counts = { sent: modifiedIDs.length, failed: 0 };
+ let counts = { sent: records.length, failed: 0 };
// collection we'll upload
let up = new Collection(this.engineURL, null, this.service);
let failed = [];
let successful = [];
let handleResponse = (resp, batchOngoing = false) => {
// Note: We don't want to update this.lastSync, or this._modified until
@@ -1542,48 +1542,58 @@ SyncEngine.prototype = {
// clear for next batch
failed.length = 0;
successful.length = 0;
};
let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse);
- for (let id of modifiedIDs) {
- let out;
- let ok = false;
- try {
- out = this._createRecord(id);
- if (this._log.level <= Log.Level.Trace)
- this._log.trace("Outgoing: " + out);
-
- out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
- ok = true;
- } catch (ex) {
- if (Async.isShutdownException(ex)) {
- throw ex;
- }
- this._log.warn("Error creating record", ex);
- }
- if (ok) {
- let { enqueued, error } = postQueue.enqueue(out);
- if (!enqueued) {
- ++counts.failed;
- if (!this.allowSkippedRecord) {
- throw error;
- }
+ for (let out of records) {
+ let { enqueued, error } = postQueue.enqueue(out);
+ if (!enqueued) {
+ ++counts.failed;
+ if (!this.allowSkippedRecord) {
+ throw error;
}
}
- this._store._sleep(0);
}
postQueue.flush(true);
Observers.notify("weave:engine:sync:uploaded", counts, this.name);
}
},
+ fetchOutgoingRecords() {
+ let modifiedIDs = this._modified.ids();
+ this._log.trace("Fetching " + modifiedIDs.length +
+ " outgoing records");
+
+ let records = [];
+ for (let id of modifiedIDs) {
+ try {
+ let out = this._createRecord(id);
+ if (this._log.level <= Log.Level.Trace) {
+ this._log.trace("Outgoing: " + out);
+ }
+ out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
+ records.push(out);
+ } catch (ex) {
+ if (Async.isShutdownException(ex)) {
+ throw ex;
+ }
+ this._log.warn(`Error creating record for ${id}`, ex);
+ }
+ // `_createRecord` might perform synchronous I/O, so we yield back to the
+ // main thread. See bug 636312.
+ this._store._sleep(0);
+ }
+
+ return records;
+ },
+
_onRecordsWritten(succeeded, failed) {
// Implement this method to take specific actions against successfully
// uploaded records and failed records.
},
// Any cleanup necessary.
// Save the current snapshot so as to calculate changes at next sync
_syncFinish: function () {