Bug 1289536 (part 1) - Add events to Sync ping. r?gfritzsche
MozReview-Commit-ID: 184fIhelOa6
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -1745,12 +1745,16 @@ Sync11Service.prototype = {
this._log.debug("Server returned invalid JSON for '" + info_type +
"': " + this.response.body);
return callback(ex);
}
this._log.trace("Successfully retrieved '" + info_type + "'.");
return callback(null, result);
});
},
+
+ recordTelemetryEvent(object, method, value, extra = undefined) {
+ Svc.Obs.notify("weave:telemetry:event", { object, method, value, extra });
+ },
};
this.Service = new Sync11Service();
Service.onStartup();
--- a/services/sync/modules/telemetry.js
+++ b/services/sync/modules/telemetry.js
@@ -40,16 +40,18 @@ const TOPICS = [
"weave:engine:sync:start",
"weave:engine:sync:finish",
"weave:engine:sync:error",
"weave:engine:sync:applied",
"weave:engine:sync:uploaded",
"weave:engine:validate:finish",
"weave:engine:validate:error",
+
+ "weave:telemetry:event",
];
const PING_FORMAT_VERSION = 1;
// The set of engines we record telemetry for - any other engines are ignored.
const ENGINES = new Set(["addons", "bookmarks", "clients", "forms", "history",
"passwords", "prefs", "tabs", "extension-storage"]);
@@ -120,16 +122,54 @@ function tryGetMonotonicTimestamp() {
function timeDeltaFrom(monotonicStartTime) {
let now = tryGetMonotonicTimestamp();
if (monotonicStartTime !== -1 && now !== -1) {
return Math.round(now - monotonicStartTime);
}
return -1;
}
+// This function validates the payload of a telemetry "event" - this can be
+// removed once there are APIs available for the telemetry modules to collect
+// these events (bug XXXXXXX) - but for now we simulate that planned API as best
+// we can.
+function validateTelemetryEvent(eventDetails) {
+ let { object, method, value, extra } = eventDetails;
+ // Do do basic validation of the params - everything except "extra" must
+ // be a string. method and object are required.
+ if (typeof method != "string" || typeof object != "string" ||
+ (value && typeof value != "string") ||
+ (extra && typeof extra != "object")) {
+ log.warn("Invalid event parameters - wrong types", eventDetails);
+ return false;
+ }
+ // length checks.
+ if (method.length > 20 || object.length > 20 ||
+ (value && value.length > 80)) {
+ log.warn("Invalid event parameters - wrong lengths", eventDetails);
+ return false;
+ }
+
+ // extra can be falsey, or an object with string names and values.
+ if (extra) {
+ if (Object.keys(extra).length > 10) {
+ log.warn("Invalid event parameters - too many extra keys", eventDetails);
+ return false;
+ }
+ for (let [ename, evalue] of Object.entries(extra)) {
+ if (typeof ename != "string" || ename.length > 15 ||
+ typeof evalue != "string" || evalue.length > 85) {
+ log.warn(`Invalid event parameters: extra item "${ename} is invalid`, eventDetails);
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
class EngineRecord {
constructor(name) {
// startTime is in ms from process start, but is monotonic (unlike Date.now())
// so we need to keep both it and when.
this.startTime = tryGetMonotonicTimestamp();
this.name = name;
}
@@ -411,36 +451,40 @@ class SyncTelemetryImpl {
log.level = Log.Level[Svc.Prefs.get("log.logger.telemetry", "Trace")];
// This is accessible so we can enable custom engines during tests.
this.allowedEngines = allowedEngines;
this.current = null;
this.setupObservers();
this.payloads = [];
this.discarded = 0;
+ this.events = [];
this.maxPayloadCount = Svc.Prefs.get("telemetry.maxPayloadCount");
this.submissionInterval = Svc.Prefs.get("telemetry.submissionInterval") * 1000;
this.lastSubmissionTime = Telemetry.msSinceProcessStart();
+ this.maxEventsCount = Svc.Prefs.get("telemetry.maxEventsCount", 1000);
}
getPingJSON(reason) {
return {
why: reason,
discarded: this.discarded || undefined,
version: PING_FORMAT_VERSION,
syncs: this.payloads.slice(),
+ events: this.events.length == 0 ? undefined : this.events,
};
}
finish(reason) {
// Note that we might be in the middle of a sync right now, and so we don't
// want to touch this.current.
let result = this.getPingJSON(reason);
this.payloads = [];
this.discarded = 0;
+ this.events = [];
this.submit(result);
}
setupObservers() {
for (let topic of TOPICS) {
Observers.add(topic, this, this);
}
}
@@ -492,16 +536,49 @@ class SyncTelemetryImpl {
}
this.current = null;
if ((Telemetry.msSinceProcessStart() - this.lastSubmissionTime) > this.submissionInterval) {
this.finish("schedule");
this.lastSubmissionTime = Telemetry.msSinceProcessStart();
}
}
+ _recordEvent(eventDetails) {
+ if (this.events.length >= this.maxEventsCount) {
+ log.warn("discarding event - already queued our maximum", topic);
+ return;
+ }
+
+ if (!validateTelemetryEvent(eventDetails)) {
+ // we've already logged what the problem is...
+ return;
+ }
+ log.debug("recording event", eventDetails);
+
+ let { object, method, value, extra } = eventDetails;
+ let category = "sync";
+ let ts = Math.floor(tryGetMonotonicTimestamp()); // 1-second resolution for privacy
+
+ // An event record is a simple array with at least 4 items.
+ let event = [ts, category, method, object];
+ // It may have up to 6 elements if |extra| is defined
+ if (value) {
+ event.push(value);
+ if (extra) {
+ event.push(extra);
+ }
+ } else {
+ if (extra) {
+ event.push(null); // a null for the empty value.
+ event.push(extra);
+ }
+ }
+ this.events.push(event);
+ }
+
observe(subject, topic, data) {
log.trace(`observed ${topic} ${data}`);
switch (topic) {
case "profile-before-change":
this.shutdown();
break;
@@ -560,16 +637,20 @@ class SyncTelemetryImpl {
break;
case "weave:engine:validate:error":
if (this._checkCurrent(topic)) {
this.current.onEngineValidateError(data, subject || "Unknown");
}
break;
+ case "weave:telemetry:event":
+ this._recordEvent(subject);
+ break;
+
default:
log.warn(`unexpected observer topic ${topic}`);
break;
}
}
}
this.SyncTelemetry = new SyncTelemetryImpl(ENGINES);
--- a/services/sync/tests/unit/head_helpers.js
+++ b/services/sync/tests/unit/head_helpers.js
@@ -271,17 +271,25 @@ function get_sync_test_telemetry() {
function assert_valid_ping(record) {
// This is called as the test harness tears down due to shutdown. This
// will typically have no recorded syncs, and the validator complains about
// it. So ignore such records (but only ignore when *both* shutdown and
// no Syncs - either of them not being true might be an actual problem)
if (record && (record.why != "shutdown" || record.syncs.length != 0)) {
if (!SyncPingValidator(record)) {
- deepEqual([], SyncPingValidator.errors, "Sync telemetry ping validation failed");
+ if (SyncPingValidator.errors.length) {
+ // validation failed - using a simple |deepEqual([], errors)| tends to
+ // truncate the validation errors in the output and doesn't show that
+ // the ping actually was - so be helpful.
+ do_print("telemetry ping validation failed");
+ do_print("the ping data is: " + JSON.stringify(record, undefined, 2));
+ do_print("the validation failures: " + JSON.stringify(SyncPingValidator.errors, undefined, 2));
+ ok(false, "Sync telemetry ping validation failed - see output above for details");
+ }
}
equal(record.version, 1);
record.syncs.forEach(p => {
lessOrEqual(p.when, Date.now());
if (p.devices) {
ok(!p.devices.some(device => device.id == p.deviceID));
equal(new Set(p.devices.map(device => device.id)).size,
p.devices.length, "Duplicate device ids in ping devices list");
--- a/services/sync/tests/unit/sync_ping_schema.json
+++ b/services/sync/tests/unit/sync_ping_schema.json
@@ -7,16 +7,21 @@
"properties": {
"version": { "type": "integer", "minimum": 0 },
"discarded": { "type": "integer", "minimum": 1 },
"why": { "enum": ["shutdown", "schedule"] },
"syncs": {
"type": "array",
"minItems": 1,
"items": { "$ref": "#/definitions/payload" }
+ },
+ "events": {
+ "type": "array",
+ "minItems": 1,
+ "items": { "$ref": "#/definitions/event" }
}
},
"definitions": {
"payload": {
"type": "object",
"additionalProperties": false,
"required": ["when", "uid", "took"],
"properties": {
@@ -123,16 +128,21 @@
{"required": ["sent"]},
{"required": ["failed"]}
],
"properties": {
"sent": { "type": "integer", "minimum": 1 },
"failed": { "type": "integer", "minimum": 1 }
}
},
+ "event": {
+ "type": "array",
+ "minItems": 4,
+ "maxItems": 6
+ },
"error": {
"oneOf": [
{ "$ref": "#/definitions/httpError" },
{ "$ref": "#/definitions/nsError" },
{ "$ref": "#/definitions/shutdownError" },
{ "$ref": "#/definitions/authError" },
{ "$ref": "#/definitions/otherError" },
{ "$ref": "#/definitions/unexpectedError" },
--- a/services/sync/tests/unit/test_telemetry.js
+++ b/services/sync/tests/unit/test_telemetry.js
@@ -555,9 +555,97 @@ add_task(async function test_no_foreign_
await SyncTestingInfrastructure(server);
try {
let ping = await sync_and_validate_telem();
ok(ping.engines.every(e => e.name !== "bogus"));
} finally {
Service.engineManager.unregister(engine);
await cleanAndGo(server);
}
-});
\ No newline at end of file
+});
+
+add_task(async function test_events() {
+ Service.engineManager.register(BogusEngine);
+ let engine = Service.engineManager.get("bogus");
+ engine.enabled = true;
+ let server = serverForUsers({"foo": "password"}, {
+ meta: {global: {engines: {bogus: {version: engine.version, syncID: engine.syncID}}}},
+ steam: {}
+ });
+
+ await SyncTestingInfrastructure(server);
+ try {
+ Service.recordTelemetryEvent("object", "method", "value", { foo: "bar" });
+ let ping = await wait_for_ping(() => Service.sync(), true, true);
+ equal(ping.events.length, 1);
+ let [timestamp, category, method, object, value, extra] = ping.events[0];
+ ok((typeof timestamp == "number") && timestamp > 0); // timestamp.
+ equal(category, "sync");
+ equal(method, "method");
+ equal(object, "object");
+ equal(value, "value");
+ deepEqual(extra, { foo: "bar" });
+ // Test with optional values.
+ Service.recordTelemetryEvent("object", "method");
+ ping = await wait_for_ping(() => Service.sync(), false, true);
+ equal(ping.events.length, 1);
+ equal(ping.events[0].length, 4);
+
+ Service.recordTelemetryEvent("object", "method", "extra");
+ ping = await wait_for_ping(() => Service.sync(), false, true);
+ equal(ping.events.length, 1);
+ equal(ping.events[0].length, 5);
+
+ Service.recordTelemetryEvent("object", "method", undefined, { foo: "bar" });
+ ping = await wait_for_ping(() => Service.sync(), false, true);
+ equal(ping.events.length, 1);
+ equal(ping.events[0].length, 6);
+ [timestamp, category, method, object, value, extra] = ping.events[0];
+ equal(value, null);
+ } finally {
+ Service.engineManager.unregister(engine);
+ await cleanAndGo(server);
+ }
+});
+
+add_task(async function test_invalid_events() {
+ Service.engineManager.register(BogusEngine);
+ let engine = Service.engineManager.get("bogus");
+ engine.enabled = true;
+ let server = serverForUsers({"foo": "password"}, {
+ meta: {global: {engines: {bogus: {version: engine.version, syncID: engine.syncID}}}},
+ steam: {}
+ });
+
+ async function checkNotRecorded(...args) {
+ Service.recordTelemetryEvent.call(args);
+ let ping = await wait_for_ping(() => Service.sync(), false, true);
+ equal(ping.events, undefined);
+ }
+
+ await SyncTestingInfrastructure(server);
+ try {
+ let long21 = "l".repeat(21);
+ let long81 = "l".repeat(81);
+ let long86 = "l".repeat(86);
+ await checkNotRecorded("object");
+ await checkNotRecorded("object", 2);
+ await checkNotRecorded(2, "method");
+ await checkNotRecorded("object", "method", 2);
+ await checkNotRecorded("object", "method", "value", 2);
+ await checkNotRecorded("object", "method", "value", { foo: 2 });
+ await checkNotRecorded(long21, "method", "value");
+ await checkNotRecorded("object", long21, "value");
+ await checkNotRecorded("object", "method", long81);
+ let badextra = {};
+ badextra[long21] = "x";
+ await checkNotRecorded("object", "method", "value", badextra);
+ badextra = { "x": long86 };
+ await checkNotRecorded("object", "method", "value", badextra);
+ for (let i = 0; i < 10; i++) {
+ badextra["name" + i] = "x";
+ }
+ await checkNotRecorded("object", "method", "value", badextra);
+ } finally {
+ Service.engineManager.unregister(engine);
+ await cleanAndGo(server);
+ }
+});