Bug 1317223 (part 4) -Formalize weak tracking in `BookmarksChangeset`. r=markh
MozReview-Commit-ID: C6otrtSuhZH
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -635,24 +635,30 @@ this.Engine = function Engine(name, serv
this.name = name.toLowerCase();
this.service = service;
this._notify = Utils.notify("weave:engine:");
this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
this._log.level = Log.Level[level];
+ this._modified = this.emptyChangeset();
this._tracker; // initialize tracker to load previously changed IDs
this._log.debug("Engine initialized");
}
Engine.prototype = {
// _storeObj, and _trackerObj should to be overridden in subclasses
_storeObj: Store,
_trackerObj: Tracker,
+ // Override this method to return a new changeset type.
+ emptyChangeset() {
+ return new Changeset();
+ },
+
// Local 'constant'.
// Signal to the engine that processing further records is pointless.
eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
// Should we keep syncing if we find a record that cannot be uploaded (ever)?
// If this is false, we'll throw, otherwise, we'll ignore the record and
// continue. This currently can only happen due to the record being larger
// than the record upload limit.
@@ -898,24 +904,33 @@ SyncEngine.prototype = {
/*
* Returns a changeset for this sync. Engine implementations can override this
* method to bypass the tracker for certain or all changed items.
*/
getChangedIDs() {
return this._tracker.changedIDs;
},
- // Create a new record using the store and add in crypto fields.
+ // Create a new record using the store and add in metadata.
_createRecord(id) {
let record = this._store.createRecord(id, this.name);
record.id = id;
record.collection = this.name;
return record;
},
+ // Creates a tombstone Sync record with additional metadata.
+ _createTombstone(id) {
+ let tombstone = new this._recordObj(this.name, id);
+ tombstone.id = id;
+ tombstone.collection = this.name;
+ tombstone.deleted = true;
+ return tombstone;
+ },
+
// Any setup that needs to happen at the beginning of each sync.
_syncStartup() {
// Determine if we need to wipe on outdated versions
let metaGlobal = this.service.recordManager.get(this.metaURL);
let engines = metaGlobal.payload.engines || {};
let engineData = engines[this.name] || {};
@@ -957,22 +972,24 @@ SyncEngine.prototype = {
// Save objects that need to be uploaded in this._modified. We also save
// the timestamp of this fetch in this.lastSyncLocal. As we successfully
// upload objects we remove them from this._modified. If an error occurs
// or any objects fail to upload, they will remain in this._modified. At
// the end of a sync, or after an error, we add all objects remaining in
// this._modified to the tracker.
this.lastSyncLocal = Date.now();
+ let initialChanges;
if (this.lastSync) {
- this._modified = this.pullNewChanges();
+ initialChanges = this.pullNewChanges();
} else {
this._log.debug("First sync, uploading all items");
- this._modified = this.pullAllChanges();
+ initialChanges = this.pullAllChanges();
}
+ this._modified.replace(initialChanges);
// Clear the tracker now. If the sync fails we'll add the ones we failed
// to upload back.
this._tracker.clearChangedIDs();
this._log.info(this._modified.count() +
" outgoing items pre-reconciliation");
// Keep track of what to delete at the end of sync
@@ -1427,17 +1444,17 @@ SyncEngine.prototype = {
// If the local item was modified, we carry its metadata forward so
// appropriate reconciling can be performed.
if (this._modified.has(localDupeGUID)) {
locallyModified = true;
localAge = this._tracker._now() - this._modified.getModifiedTimestamp(localDupeGUID);
remoteIsNewer = remoteAge < localAge;
- this._modified.swap(localDupeGUID, item.id);
+ this._modified.changeID(localDupeGUID, item.id);
} else {
locallyModified = false;
localAge = null;
}
// Tell the engine to do whatever it needs to switch the items.
this._switchItemToDupe(localDupeGUID, item);
@@ -1750,32 +1767,32 @@ SyncEngine.prototype = {
* ensure we always use the remote version if one exists.
*
* This function is only called for the first sync. Subsequent syncs call
* `pullNewChanges`.
*
* @return A `Changeset` object.
*/
pullAllChanges() {
- let changeset = new Changeset();
+ let changes = {};
for (let id in this._store.getAllIDs()) {
- changeset.set(id, 0);
+ changes[id] = 0;
}
- return changeset;
+ return changes;
},
/*
* Returns a changeset containing entries for all currently tracked items.
* The default implementation returns a changeset with timestamps indicating
* when the item was added to the tracker.
*
* @return A `Changeset` object.
*/
pullNewChanges() {
- return new Changeset(this.getChangedIDs());
+ return this.getChangedIDs();
},
/**
* Adds all remaining changeset entries back to the tracker, typically for
* items that failed to upload. This method is called at the end of each sync.
*
*/
trackRemainingChanges() {
@@ -1787,51 +1804,56 @@ SyncEngine.prototype = {
/**
* A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
* and stores opaque change data for tracked IDs. The default implementation
* only records timestamps, though engines can extend this to store additional
* data for each entry.
*/
class Changeset {
- // Creates a changeset with an initial set of tracked entries.
- constructor(changes = {}) {
- this.changes = changes;
+ // Creates an empty changeset.
+ constructor() {
+ this.changes = {};
}
// Returns the last modified time, in seconds, for an entry in the changeset.
// `id` is guaranteed to be in the set.
getModifiedTimestamp(id) {
return this.changes[id];
}
// Adds a change for a tracked ID to the changeset.
set(id, change) {
this.changes[id] = change;
}
- // Adds multiple entries to the changeset.
+ // Adds multiple entries to the changeset, preserving existing entries.
insert(changes) {
Object.assign(this.changes, changes);
}
+ // Overwrites the existing set of tracked changes with new entries.
+ replace(changes) {
+ this.changes = changes;
+ }
+
// Indicates whether an entry is in the changeset.
has(id) {
return id in this.changes;
}
// Deletes an entry from the changeset. Used to clean up entries for
// reconciled and successfully uploaded records.
delete(id) {
delete this.changes[id];
}
- // Swaps two entries in the changeset. Used when reconciling duplicates that
- // have local changes.
- swap(oldID, newID) {
+ // Changes the ID of an entry in the changeset. Used when reconciling
+ // duplicates that have local changes.
+ changeID(oldID, newID) {
this.changes[newID] = this.changes[oldID];
delete this.changes[oldID];
}
// Returns an array of all tracked IDs in this changeset.
ids() {
return Object.keys(this.changes);
}
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -272,16 +272,20 @@ BookmarksEngine.prototype = {
_storeObj: BookmarksStore,
_trackerObj: BookmarksTracker,
version: 2,
_defaultSort: "index",
syncPriority: 4,
allowSkippedRecord: false,
+ emptyChangeset() {
+ return new BookmarksChangeset();
+ },
+
_guidMapFailed: false,
_buildGUIDMap: function _buildGUIDMap() {
let store = this._store;
let guidMap = {};
let tree = Async.promiseSpinningly(PlacesUtils.promiseBookmarksTree(""));
function* walkBookmarksTree(tree, parent = null) {
@@ -514,25 +518,31 @@ BookmarksEngine.prototype = {
},
_syncCleanup: function _syncCleanup() {
SyncEngine.prototype._syncCleanup.call(this);
delete this._guidMap;
},
_createRecord: function _createRecord(id) {
+ if (this._modified.isTombstone(id)) {
+ // If we already know a changed item is a tombstone, just create the
+ // record without dipping into Places.
+ return this._createTombstone(id);
+ }
// Create the record as usual, but mark it as having dupes if necessary.
let record = SyncEngine.prototype._createRecord.call(this, id);
let entry = this._mapDupe(record);
if (entry != null && entry.hasDupe) {
record.hasDupe = true;
}
if (record.deleted) {
- // Make sure deleted items are marked as tombstones. This handles the
- // case where a changed item is deleted during a sync.
+ // Make sure deleted items are marked as tombstones. We do this here
+ // in addition to the `isTombstone` call above because it's possible
+ // a changed bookmark might be deleted during a sync (bug 1313967).
this._modified.setTombstone(record.id);
}
return record;
},
_findDupe: function _findDupe(item) {
this._log.trace("Finding dupe for " + item.id +
" (already duped: " + item.hasDupe + ").");
@@ -549,18 +559,17 @@ BookmarksEngine.prototype = {
return mapped ? mapped.toString() : mapped;
},
pullAllChanges() {
return this.pullNewChanges();
},
pullNewChanges() {
- let changes = Async.promiseSpinningly(this._tracker.promiseChangedIDs());
- return new BookmarksChangeset(changes);
+ return Async.promiseSpinningly(this._tracker.promiseChangedIDs());
},
trackRemainingChanges() {
let changes = this._modified.changes;
Async.promiseSpinningly(PlacesSyncUtils.bookmarks.pushChanges(changes));
},
_deleteId(id) {
@@ -1061,38 +1070,99 @@ BookmarksTracker.prototype = {
this.score += SCORE_INCREMENT_XLARGE;
this._batchSawScoreIncrement = false;
}
},
onItemVisited() {}
};
class BookmarksChangeset extends Changeset {
+ constructor() {
+ super();
+ // Weak changes are part of the changeset, but don't bump the change
+ // counter, and aren't persisted anywhere.
+ this.weakChanges = {};
+ }
+
getModifiedTimestamp(id) {
let change = this.changes[id];
- if (!change || change.synced) {
+ if (change) {
// Pretend the change doesn't exist if we've already synced or
// reconciled it.
- return Number.NaN;
+ return change.synced ? Number.NaN : change.modified;
}
- return change.modified;
+ if (this.weakChanges[id]) {
+ // For weak changes, we use a timestamp from long ago to ensure we always
+ // prefer the remote version in case of conflicts.
+ return 0;
+ }
+ return Number.NaN;
+ }
+
+ setWeak(id, { tombstone = false } = {}) {
+ this.weakChanges[id] = { tombstone };
}
has(id) {
- return id in this.changes && !this.changes[id].synced;
+ let change = this.changes[id];
+ if (change) {
+ return !change.synced;
+ }
+ return !!this.weakChanges[id];
}
setTombstone(id) {
let change = this.changes[id];
if (change) {
change.tombstone = true;
}
+ let weakChange = this.weakChanges[id];
+ if (weakChange) {
+ // Not strictly necessary, since we never persist weak changes, but may
+ // be useful for bookkeeping.
+ weakChange.tombstone = true;
+ }
}
delete(id) {
let change = this.changes[id];
if (change) {
// Mark the change as synced without removing it from the set. We do this
// so that we can update Places in `trackRemainingChanges`.
change.synced = true;
}
+ delete this.weakChanges[id];
+ }
+
+ changeID(oldID, newID) {
+ super.changeID(oldID, newID);
+ this.weakChanges[newID] = this.weakChanges[oldID];
+ delete this.weakChanges[oldID];
+ }
+
+ ids() {
+ let results = new Set();
+ for (let id in this.changes) {
+ results.add(id);
+ }
+ for (let id in this.weakChanges) {
+ results.add(id);
+ }
+ return [...results];
+ }
+
+ clear() {
+ super.clear();
+ this.weakChanges = {};
+ }
+
+ isTombstone(id) {
+ let change = this.changes[id];
+ if (change) {
+ return change.tombstone;
+ }
+ let weakChange = this.weakChanges[id];
+ if (weakChange) {
+ return weakChange.tombstone;
+ }
+ return false;
}
}
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -62,17 +62,17 @@ HistoryEngine.prototype = {
} finally {
notifyHistoryObservers("onEndUpdateBatch");
}
},
pullNewChanges() {
let modifiedGUIDs = Object.keys(this._tracker.changedIDs);
if (!modifiedGUIDs.length) {
- return new Changeset();
+ return {};
}
let db = PlacesUtils.history.QueryInterface(Ci.nsPIPlacesDatabase)
.DBConnection;
// Filter out hidden pages and `TRANSITION_FRAMED_LINK` visits. These are
// excluded when rendering the history menu, so we use the same constraints
// for Sync. We also don't want to sync `TRANSITION_EMBED` visits, but those
@@ -100,17 +100,17 @@ HistoryEngine.prototype = {
let results = Async.querySpinningly(statement, ["guid"]);
let guids = results.map(result => result.guid);
this._tracker.removeChangedID(...guids);
} finally {
statement.finalize();
}
}
- return new Changeset(this._tracker.changedIDs);
+ return this._tracker.changedIDs;
},
};
function HistoryStore(name, engine) {
Store.call(this, name, engine);
// Explicitly nullify our references to our cached services so we don't leak
Svc.Obs.add("places-shutdown", function() {
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -196,17 +196,17 @@ add_task(async function test_change_duri
collection.insert(bmk4_guid, encryptPayload(localTaggedBmk.cleartext));
}
await assertChildGuids(folder1_guid, [bmk1_guid], "Folder should have 1 child before first sync");
_("Perform first sync");
{
let changes = engine.pullNewChanges();
- deepEqual(changes.ids().sort(), [folder1_guid, bmk1_guid, "toolbar"].sort(),
+ deepEqual(Object.keys(changes).sort(), [folder1_guid, bmk1_guid, "toolbar"].sort(),
"Should track bookmark and folder created before first sync");
await sync_engine_and_validate_telem(engine, false);
}
let bmk2_id = store.idForGUID(bmk2_guid);
let bmk3_guid = store.GUIDForId(bmk3_id);
_(`Mozilla GUID: ${bmk3_guid}`);
{
@@ -225,17 +225,17 @@ add_task(async function test_change_duri
equal(taggedURIs.length, 1, "Should have 1 tagged URI");
equal(taggedURIs[0].spec, "https://example.org/",
"Synced tagged bookmark should appear in tagged URI list");
}
_("Perform second sync");
{
let changes = engine.pullNewChanges();
- deepEqual(changes.ids().sort(), [bmk3_guid, folder1_guid].sort(),
+ deepEqual(Object.keys(changes).sort(), [bmk3_guid, folder1_guid].sort(),
"Should track bookmark added during last sync and its parent");
await sync_engine_and_validate_telem(engine, false);
ok(collection.wbo(bmk3_guid),
"Bookmark created during first sync should be uploaded during second sync");
await assertChildGuids(folder1_guid, [bmk1_guid, bmk3_guid, bmk2_guid],
"Folder 1 should have same children after second sync");
--- a/services/sync/tests/unit/test_history_tracker.js
+++ b/services/sync/tests/unit/test_history_tracker.js
@@ -69,32 +69,31 @@ async function addVisit(suffix, referrer
function run_test() {
initTestLogging("Trace");
Log.repository.getLogger("Sync.Tracker.History").level = Log.Level.Trace;
run_next_test();
}
async function verifyTrackerEmpty() {
let changes = engine.pullNewChanges();
- equal(changes.count(), 0);
+ do_check_empty(changes);
equal(tracker.score, 0);
}
async function verifyTrackedCount(expected) {
let changes = engine.pullNewChanges();
- equal(changes.count(), expected);
+ do_check_attribute_count(changes, expected);
}
async function verifyTrackedItems(tracked) {
let changes = engine.pullNewChanges();
- let trackedIDs = new Set(changes.ids());
+ let trackedIDs = new Set(Object.keys(changes));
for (let guid of tracked) {
- ok(changes.has(guid), `${guid} should be tracked`);
- ok(changes.getModifiedTimestamp(guid) > 0,
- `${guid} should have a modified time`);
+ ok(guid in changes, `${guid} should be tracked`);
+ ok(changes[guid] > 0, `${guid} should have a modified time`);
trackedIDs.delete(guid);
}
equal(trackedIDs.size, 0, `Unhandled tracked IDs: ${
JSON.stringify(Array.from(trackedIDs))}`);
}
async function startTracking() {
Svc.Obs.notify("weave:engine:start-tracking");