vcsreplicator: teach pulse notifier to skip messages (
bug 1312238); r?fubar
We ran into an issue over the weekend where pulsenotifier.service got
wedged processing a bad message. This is the first instance of this in
months.
Unfortunately, we didn't have a good way for the pulse notification
daemon to skip over "bad" messages and I had to hack source code on
the server to restore service.
This commit introduces a "--skip" flag on the daemon that when
specified will instruct the daemon to skip the next message and
exit.
The operational docs for this service have been updated to reflect
how to invoke it in production.
MozReview-Commit-ID: GtKjsj4Lpb9
--- a/docs/hgmo/ops.rst
+++ b/docs/hgmo/ops.rst
@@ -638,16 +638,44 @@ The Pulse notification daemon should rec
If the ``pulsenotifier`` daemon has crashed, try restarting it::
$ systemctl restart pulsenotifier.service
If the hg.mozilla.org Kafka cluster is down, lots of other alerts are
likely firing. You should alert VCS on call.
+In some cases, ``pulsenotifier`` may repeatedly crash due to a malformed input
+message, bad data, or some such. Essentially, the process encounters bad input,
+crashes, restarts via systemd, encounters the same message again, crashes, and
+the cycle repeats until systemd gives up. This scenario should be rare, which is
+why the daemon doesn't ignore *bad* messages (ignoring messages could lead to
+data loss).
+
+If the daemon becomes wedged on a specific message, you can tell the daemon to
+skip the next message by running::
+
+ $ /var/hg/venv_tools/bin/vcsreplicator-pulse-notifier --skip /etc/mercurial/notifications.ini
+
+This command will print a message like::
+
+ skipped hg-repo-init-2 message in partition 0 for group pulsenotifier
+
+Then exit. You can then restart the daemon (if necessary) via::
+
+ $ systemctl start pulsenotifier.service
+
+Repeat as many times as necessary to clear through the *bad* messages.
+
+.. important::
+
+ If you skip messages, please file a bug against
+ ``Developer Services :: hg.mozilla.org`` and include the systemd journal
+ output for ``pulsenotifier.service`` showing the error messages.
+
Adding/Removing Nodes from Zookeeper and Kafka
==============================================
When new servers are added or removed, the Zookeeper and Kafka clusters
may need to be *rebalanced*. This typically only happens when servers
are replaced.
The process is complicated and requires a number of manual steps. It
--- a/pylib/vcsreplicator/tests/test-pulse-notifier.t
+++ b/pylib/vcsreplicator/tests/test-pulse-notifier.t
@@ -142,11 +142,34 @@ Routing keys with slashes and dashes and
push_json_url: https://hg.mozilla.org/integration/foo_Bar-baz/json-pushes?version=2&startID=0&endID=1
pushid: 1
time: \d+ (re)
user: user@example.com
repo_url: https://hg.mozilla.org/integration/foo_Bar-baz
source: serve
type: changegroup.1
+ $ cd ..
+
+Pulse client can skip messages
+
+ $ hgmo exec hgssh supervisorctl stop pulsenotifier
+ pulsenotifier: stopped
+
+ $ hgmo create-repo ignored-repo scm_level_1
+ (recorded repository creation in replication log)
+
+ $ hgmo exec hgweb0 /var/hg/venv_replication/bin/vcsreplicator-consumer --wait-for-no-lag /etc/mercurial/vcsreplicator.ini
+ $ hgmo exec hgweb1 /var/hg/venv_replication/bin/vcsreplicator-consumer --wait-for-no-lag /etc/mercurial/vcsreplicator.ini
+
+ $ hgmo exec hgssh /var/hg/venv_tools/bin/vcsreplicator-pulse-notifier --skip /etc/mercurial/notifications.ini
+ skipped heartbeat-1 message in partition 0 for group pulsenotifier
+ $ hgmo exec hgssh /var/hg/venv_tools/bin/vcsreplicator-pulse-notifier --skip /etc/mercurial/notifications.ini
+ skipped hg-repo-init-2 message in partition 0 for group pulsenotifier
+
+ $ pulseconsumer --wait-for-no-lag
+
+ $ pulse dump-messages exchange/hgpushes/v2 v2
+ []
+
Cleanup
$ hgmo clean
--- a/pylib/vcsreplicator/vcsreplicator/pulsenotifier.py
+++ b/pylib/vcsreplicator/vcsreplicator/pulsenotifier.py
@@ -117,16 +117,18 @@ def on_event(config, message_type, data)
def cli():
"""Command line interface to run the Pulse notification daemon."""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('config', help='Path to config file to load')
+ parser.add_argument('--skip', action='store_true',
+ help='Skip the consuming of the next message then exit')
args = parser.parse_args()
config = Config(filename=args.config)
if not config.c.has_section('pulse'):
print('no [pulse] config section')
sys.exit(1)
@@ -142,16 +144,34 @@ def cli():
# hglib will use 'hg' which relies on PATH being correct. Since we're
# running from a virtualenv, PATH may not be set unless the virtualenv
# is activated. Overwrite the hglib defaults with a value from the config.
hglib.HGPATH = config.hg_path
client = config.get_client_from_section('pulseconsumer', timeout=5)
with Consumer(client, group, topic, partitions=None) as consumer:
+ if args.skip:
+ r = consumer.get_message()
+ if not r:
+ print('no message available; nothing to skip')
+ sys.exit(1)
+
+ partition = r[0]
+
+ try:
+ message_type = r[2]['name']
+ except Exception:
+ message_type = 'UNKNOWN'
+
+ consumer.commit(partitions=[partition])
+ print('skipped %s message in partition %d for group %s' % (
+ message_type, partition, group))
+ sys.exit(0)
+
cbkwargs = {
'config': config,
}
res = run_in_loop(logger, consume_one, config=config, consumer=consumer,
cb=on_event, cbkwargs=cbkwargs)
logger.warn('process exiting code %s' % res)
sys.exit(res)