mozreviewpulse: add a FIFO pulse consumer (
Bug 1287537). r?mars
This lays the groundwork for having mozreview consume various
messages from pulse.
MozReview-Commit-ID: 9x5usNSf1R3
new file mode 100644
--- /dev/null
+++ b/mozreviewpulse/mozreviewpulse/consumer.py
@@ -0,0 +1,167 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import logging
+import Queue
+
+import kombu
+
+
+class NoQueueDefined(RuntimeError):
+ """Exception raised when trying to consume with no queue."""
+ pass
+
+
+class FIFOPulseConsumer(object):
+ """A pulse consumer which consumes in FIFO order.
+
+ This consumer is designed to be the sole consumer of
+ a pulse queue. If other processes are able to consume
+ from the queue at the same there is no guarantee of
+ FIFO processing of the queue.
+
+ In the event that processing of a message cannot be
+ completed, but it should be retried as the next
+ message of the queue, the callback should not ack
+ or reject the message. An ignored message will
+ stay at the head of the queue until it is rejected
+ or acked.
+
+ Messages should never be requeued (also known as
+ rejected with requeue), since the delivery mechanics
+ are not as well defined and it could violate FIFO
+ ordering. Instead the message should not be acked
+ or nacked, which will cause it to be processed again
+ as previously mentioned.
+ """
+
+ def __init__(self, host, port, userid=None, password=None,
+ ssl=False, timeout=1.0, exchange=None, queue=None,
+ routing_key=None, callback=None, logger=None, **kwargs):
+ """Initialize the consumer.
+
+ Args:
+ host (str): The Pulse hostname.
+ port (int): The Pulse server port.
+ userid (str): The Pulse user to authenticate as.
+ password (str): The Pulse user password for authentication.
+ ssl (bool): Should ssl be used when connection to Pulse.
+ timeout (float): The timeout in seconds to use when fetching
+ messages from the Pulse server.
+ exchange (str): The Pulse exchange name where messages are
+ produced.
+ queue (str): The Pulse queue to consume from.
+ routing_key (str): The routing key to use for the queue.
+ callback (callable): A callable to be called for each message. The
+ callable will be passed the message as the first argument, and
+ the consumer instance through the `consumer` kwarg.
+ logger (logging.Logger): The logger to use for logging.
+ """
+ self.host = host
+ self.port = port
+ self.userid = userid
+ self.password = password
+ self.ssl = ssl
+ self.timeout = float(timeout)
+
+ self.exchange = exchange
+ self.queue = queue
+ self.routing_key = routing_key
+ self.callback = callback
+
+ self.logger = logger or logging.getLogger(__name__)
+
+ def _create_connection(self):
+ """Create a kombu.Connection."""
+ return kombu.Connection(
+ hostname=self.host, port=self.port, userid=self.userid,
+ password=self.password, ssl=self.ssl)
+
+ def _create_simple_queue(self, connection):
+ """Create a kombu.SimpleQueue from a kombu.Connection."""
+ exchange = kombu.Exchange(
+ self.exchange, type='topic', durable=True, passive=True)
+ queue = kombu.Queue(
+ name=self.queue, exchange=exchange,
+ durable=True, routing_key=self.routing_key,
+ exclusive=False, auto_delete=False)
+ return connection.SimpleQueue(queue)
+
+ def _process_message(self, msg, connection, simple_queue):
+ """Process a message.
+
+ Return True if the consumer must reconnect before
+ consuming another message or False if the consumer
+ is fine to continue with the same connection.
+ """
+ self.callback(msg, consumer=self)
+ self._processed += 1
+
+ # If the callback did not ack/nack the message we
+ # must reconnect before consuming another message
+ # so that the same message is redelivered.
+ #
+ # We could deal with requeuing in the same way, but
+ # differentiating between a requeue and the other
+ # ack states requires touching "private" properties
+ # of the message. It would not provide any meaningful
+ # extra functionality though, since it would act the
+ # same as unacknowledged messages after a reconnect.
+ return not msg.acknowledged
+
+ def _consume(self, connection, limit=None):
+ """Attempt to consume a number of messages.
+
+ Return True if consuming should continue or False if
+ it should stop.
+ """
+ simple_queue = self._create_simple_queue(connection)
+
+ try:
+ while limit is None or self._processed < limit:
+ try:
+ msg = simple_queue.get(timeout=self.timeout)
+ except Queue.Empty:
+ if limit is not None:
+ return False
+ else:
+ continue
+
+ if self._process_message(msg, connection, simple_queue):
+ # We must reconnect so that we reconsume
+ # this message again.
+ return True
+
+ except connection.recoverable_connection_errors:
+ pass
+ except connection.connection_errors:
+ # TODO: Should we implement some sort of backoff
+ # for these irrecoverable errors? For now ignore
+ # them and just keep trying to connect in
+ # consume()
+ pass
+
+ return True
+
+ def consume(self, limit=None):
+ """Consume pulse messages.
+
+ If a `limit` is provided a `limit` number of messages will be
+ consumed or consumption will stop when the queue is empty,
+ whichever comes first. Otherwise, `consumer` will block and wait
+ for messages, consuming indefinitely.
+
+ The number of messages processed in a call to consume
+ will be returned.
+ """
+ self._processed = 0
+
+ while limit is None or self._processed < limit:
+ connection = self._create_connection()
+
+ with connection.ensure_connection() as connection:
+ if not self._consume(connection, limit=limit):
+ break
+
+ return self._processed
--- a/mozreviewpulse/tests/conftest.py
+++ b/mozreviewpulse/tests/conftest.py
@@ -72,8 +72,19 @@ def pulse_server(request, docker):
@pytest.fixture
def pulse_conn(request, pulse_server):
conn = kombu.Connection(pulse_server, port=5672)
request.addfinalizer(conn.release)
# Wait for the service to come up
conn.ensure_connection(
max_retries=10, interval_start=0.3, interval_step=0.3)
return conn
+
+
+@pytest.fixture
+def pulse_producer(pulse_conn):
+ exchange = kombu.Exchange('exchange/mrp/', type='topic')
+ producer = kombu.Producer(pulse_conn, exchange=exchange)
+
+ # Ensure the exchange is declared so that consumers
+ # can start listening before a message is published.
+ producer.maybe_declare(producer.exchange)
+ return producer
new file mode 100644
--- /dev/null
+++ b/mozreviewpulse/tests/test_consumer.py
@@ -0,0 +1,196 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+Pulse Consumer tests.
+"""
+import pytest
+
+from mozreviewpulse.consumer import FIFOPulseConsumer
+
+
+class MessageCollector(list):
+ def clear(self):
+ del self[:]
+
+ def ack_callback(self, message, consumer=None, *args, **kwargs):
+ self.append(message)
+ message.ack()
+
+ def reject_callback(self, message, consumer=None, *args, **kwargs):
+ self.append(message)
+ message.reject()
+
+ def requeue_callback(self, message, consumer=None, *args, **kwargs):
+ self.append(message)
+ message.requeue()
+
+ def ignore_callback(self, message, consumer=None, *args, **kwargs):
+ self.append(message)
+ # Don't ack or reject the message.
+
+ def id_for(self, i):
+ return self[i].payload['id']
+
+
+def build_consumer(producer, queue, routing_key='testmessage',
+ consumer_class=FIFOPulseConsumer, **kwargs):
+ """Construct a consumer for the provided producer.
+
+ The consumer should be created before producing any
+ messages.
+ """
+ msgs = MessageCollector()
+ consumer = consumer_class(
+ host=producer.connection.hostname,
+ port=producer.connection.port or 5672,
+ exchange=producer.exchange.name,
+ queue=queue,
+ routing_key=routing_key,
+ callback=msgs.ack_callback,
+ **kwargs)
+
+ # attempt to consume a single message, which will timeout,
+ # so that our queue is declared.
+ assert consumer.consume(limit=1) == 0
+ assert len(msgs) == 0
+ return consumer, msgs
+
+
+def produce_incrementing_messages(producer, n, routing_key='testmessage'):
+ """Publish n messages with incrementing ids on the producer."""
+ for i in xrange(n):
+ producer.publish({'id': i}, routing_key=routing_key, retry=True)
+
+
+def test_limit_messages_consumed(pulse_producer):
+ consumer, msgs = build_consumer(pulse_producer,
+ 'limit_messages_consumed_queue')
+
+ # Produce a few messages that we can consume.
+ produce_incrementing_messages(pulse_producer, 3)
+
+ # We should only consume a single message if we set the limit
+ # to 1.
+ assert consumer.consume(limit=1) == 1
+ assert len(msgs) == 1
+
+ # When the limit is higher than the number of messages in
+ # the queue, consumption will stop when the queue is empty.
+ assert consumer.consume(limit=10) == 2
+ assert len(msgs) == 3
+
+
+def test_fifo_ack_removes_message(pulse_producer):
+ consumer, msgs = build_consumer(pulse_producer,
+ 'fifo_ack_removes_message_queue')
+ produce_incrementing_messages(pulse_producer, 2)
+
+ consumer.callback = msgs.ack_callback
+ assert consumer.consume(limit=1) == 1
+ assert consumer.consume(limit=1) == 1
+ assert len(msgs) == 2
+
+ # The two messages consumed are not duplicates.
+ assert msgs.id_for(0) != msgs.id_for(1)
+ assert msgs.id_for(0) == 0
+ assert msgs.id_for(1) == 1
+
+
+def test_fifo_reject_removes_message(pulse_producer):
+ consumer, msgs = build_consumer(pulse_producer,
+ 'fifo_reject_removes_message_queue')
+ produce_incrementing_messages(pulse_producer, 2)
+
+ consumer.callback = msgs.reject_callback
+ assert consumer.consume(limit=1) == 1
+ assert consumer.consume(limit=1) == 1
+ assert len(msgs) == 2
+
+ # The two messages consumed are not duplicates.
+ assert msgs.id_for(0) != msgs.id_for(1)
+ assert msgs.id_for(0) == 0
+ assert msgs.id_for(1) == 1
+
+
+def test_fifo_ignore_redelivers_message(pulse_producer):
+ consumer, msgs = build_consumer(pulse_producer,
+ 'fifo_ignore_redelivers_message_queue')
+ produce_incrementing_messages(pulse_producer, 2)
+ consumer.callback = msgs.ignore_callback
+
+ # We should be able to consume many more messages
+ # than there are currently queued since the same
+ # message should be redelivered every time it
+ # is not acked.
+ assert consumer.consume(limit=20) == 20
+
+ # The same message should have been delivered
+ # every time.
+ for i, msg in enumerate(msgs):
+ assert msgs.id_for(i) == msgs.id_for(1)
+
+ # Clear the messages and consume one more as
+ # that's all we really need.
+ msgs.clear()
+ assert consumer.consume(limit=1) == 1
+ assert len(msgs) == 1
+
+ # Acking should actually take the message off
+ # the queue now.
+ consumer.callback = msgs.ack_callback
+ assert consumer.consume(limit=2) == 2
+ assert len(msgs) == 3
+
+ # The second message should match the previously
+ # ignored message, but since it was acked the
+ # third message should not be a duplicate.
+ assert msgs.id_for(1) == msgs.id_for(0)
+ assert msgs.id_for(1) != msgs.id_for(2)
+
+ # The queue should be empty now.
+ assert consumer.consume(limit=1) == 0
+
+
+@pytest.mark.skip(reason="Properly handling requeue would require touching "
+ "private properties of the message. Ignoring the "
+ "message can be used in practice instead.")
+def test_fifo_requeue_redelivers_message(pulse_producer):
+ consumer, msgs = build_consumer(pulse_producer,
+ 'fifo_requeue_redelivers_message_queue')
+ produce_incrementing_messages(pulse_producer, 2)
+ consumer.callback = msgs.requeue_callback
+
+ # We should be able to consume many more messages
+ # than there are currently queued since the same
+ # message should be redelivered every time it
+ # is requeued.
+ assert consumer.consume(limit=20) == 20
+
+ # The same message should have been delivered
+ # every time.
+ for i, msg in enumerate(msgs):
+ assert msgs.id_for(i) == msgs.id_for(1)
+
+ # Clear the messages and consume one more as
+ # that's all we really need.
+ msgs.clear()
+ assert consumer.consume(limit=1) == 1
+ assert len(msgs) == 1
+
+ # Acking should actually take the message off
+ # the queue now.
+ consumer.callback = msgs.ack_callback
+ assert consumer.consume(limit=2) == 2
+ assert len(msgs) == 3
+
+ # The second message should match the previously
+ # requeued message, but since it was acked the
+ # third message should not be a duplicate.
+ assert msgs.id_for(1) == msgs.id_for(0)
+ assert msgs.id_for(1) != msgs.id_for(2)
+
+ # The queue should be empty now.
+ assert consumer.consume(limit=1) == 0
+