mozreviewpulse: add a FIFO pulse consumer (Bug 1287537). r?mars draft
authorSteven MacLeod <smacleod@mozilla.com>
Mon, 28 Nov 2016 10:09:36 -0500
changeset 10158 7a4ce6d2bc1d3fe8b6e088e89efd2504805a9ff6
parent 9988 da93498975d1f1bf67c70a4fd13612e60cc66f9c
child 10159 b99c1cab2f141d00ad296496a87cf7d08133c352
push id1453
push usersmacleod@mozilla.com
push dateSat, 14 Jan 2017 01:14:15 +0000
reviewersmars
bugs1287537
mozreviewpulse: add a FIFO pulse consumer (Bug 1287537). r?mars This lays the groundwork for having mozreview consume various messages from pulse. MozReview-Commit-ID: 9x5usNSf1R3
mozreviewpulse/mozreviewpulse/consumer.py
mozreviewpulse/tests/conftest.py
mozreviewpulse/tests/test_consumer.py
new file mode 100644
--- /dev/null
+++ b/mozreviewpulse/mozreviewpulse/consumer.py
@@ -0,0 +1,167 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import logging
+import Queue
+
+import kombu
+
+
+class NoQueueDefined(RuntimeError):
+    """Exception raised when trying to consume with no queue."""
+    pass
+
+
+class FIFOPulseConsumer(object):
+    """A pulse consumer which consumes in FIFO order.
+
+    This consumer is designed to be the sole consumer of
+    a pulse queue. If other processes are able to consume
+    from the queue at the same there is no guarantee of
+    FIFO processing of the queue.
+
+    In the event that processing of a message cannot be
+    completed, but it should be retried as the next
+    message of the queue, the callback should not ack
+    or reject the message. An ignored message will
+    stay at the head of the queue until it is rejected
+    or acked.
+
+    Messages should never be requeued (also known as
+    rejected with requeue), since the delivery mechanics
+    are not as well defined and it could violate FIFO
+    ordering. Instead the message should not be acked
+    or nacked, which will cause it to be processed again
+    as previously mentioned.
+    """
+
+    def __init__(self, host, port, userid=None, password=None,
+                 ssl=False, timeout=1.0, exchange=None, queue=None,
+                 routing_key=None, callback=None, logger=None, **kwargs):
+        """Initialize the consumer.
+
+        Args:
+            host (str): The Pulse hostname.
+            port (int): The Pulse server port.
+            userid (str): The Pulse user to authenticate as.
+            password (str): The Pulse user password for authentication.
+            ssl (bool): Should ssl be used when connection to Pulse.
+            timeout (float): The timeout in seconds to use when fetching
+                messages from the Pulse server.
+            exchange (str): The Pulse exchange name where messages are
+                produced.
+            queue (str): The Pulse queue to consume from.
+            routing_key (str): The routing key to use for the queue.
+            callback (callable): A callable to be called for each message. The
+                callable will be passed the message as the first argument, and
+                the consumer instance through the `consumer` kwarg.
+            logger (logging.Logger): The logger to use for logging.
+        """
+        self.host = host
+        self.port = port
+        self.userid = userid
+        self.password = password
+        self.ssl = ssl
+        self.timeout = float(timeout)
+
+        self.exchange = exchange
+        self.queue = queue
+        self.routing_key = routing_key
+        self.callback = callback
+
+        self.logger = logger or logging.getLogger(__name__)
+
+    def _create_connection(self):
+        """Create a kombu.Connection."""
+        return kombu.Connection(
+            hostname=self.host, port=self.port, userid=self.userid,
+            password=self.password, ssl=self.ssl)
+
+    def _create_simple_queue(self, connection):
+        """Create a kombu.SimpleQueue from a kombu.Connection."""
+        exchange = kombu.Exchange(
+            self.exchange, type='topic', durable=True, passive=True)
+        queue = kombu.Queue(
+            name=self.queue, exchange=exchange,
+            durable=True, routing_key=self.routing_key,
+            exclusive=False, auto_delete=False)
+        return connection.SimpleQueue(queue)
+
+    def _process_message(self, msg, connection, simple_queue):
+        """Process a message.
+
+        Return True if the consumer must reconnect before
+        consuming another message or False if the consumer
+        is fine to continue with the same connection.
+        """
+        self.callback(msg, consumer=self)
+        self._processed += 1
+
+        # If the callback did not ack/nack the message we
+        # must reconnect before consuming another message
+        # so that the same message is redelivered.
+        #
+        # We could deal with requeuing in the same way, but
+        # differentiating between a requeue and the other
+        # ack states requires touching "private" properties
+        # of the message. It would not provide any meaningful
+        # extra functionality though, since it would act the
+        # same as unacknowledged messages after a reconnect.
+        return not msg.acknowledged
+
+    def _consume(self, connection, limit=None):
+        """Attempt to consume a number of messages.
+
+        Return True if consuming should continue or False if
+        it should stop.
+        """
+        simple_queue = self._create_simple_queue(connection)
+
+        try:
+            while limit is None or self._processed < limit:
+                try:
+                    msg = simple_queue.get(timeout=self.timeout)
+                except Queue.Empty:
+                    if limit is not None:
+                        return False
+                    else:
+                        continue
+
+                if self._process_message(msg, connection, simple_queue):
+                    # We must reconnect so that we reconsume
+                    # this message again.
+                    return True
+
+        except connection.recoverable_connection_errors:
+            pass
+        except connection.connection_errors:
+            # TODO: Should we implement some sort of backoff
+            # for these irrecoverable errors? For now ignore
+            # them and just keep trying to connect in
+            # consume()
+            pass
+
+        return True
+
+    def consume(self, limit=None):
+        """Consume pulse messages.
+
+        If a `limit` is provided a `limit` number of messages will be
+        consumed or consumption will stop when the queue is empty,
+        whichever comes first. Otherwise, `consumer` will block and wait
+        for messages, consuming indefinitely.
+
+        The number of messages processed in a call to consume
+        will be returned.
+        """
+        self._processed = 0
+
+        while limit is None or self._processed < limit:
+            connection = self._create_connection()
+
+            with connection.ensure_connection() as connection:
+                if not self._consume(connection, limit=limit):
+                    break
+
+        return self._processed
--- a/mozreviewpulse/tests/conftest.py
+++ b/mozreviewpulse/tests/conftest.py
@@ -72,8 +72,19 @@ def pulse_server(request, docker):
 @pytest.fixture
 def pulse_conn(request, pulse_server):
     conn = kombu.Connection(pulse_server, port=5672)
     request.addfinalizer(conn.release)
     # Wait for the service to come up
     conn.ensure_connection(
         max_retries=10, interval_start=0.3, interval_step=0.3)
     return conn
+
+
+@pytest.fixture
+def pulse_producer(pulse_conn):
+    exchange = kombu.Exchange('exchange/mrp/', type='topic')
+    producer = kombu.Producer(pulse_conn, exchange=exchange)
+
+    # Ensure the exchange is declared so that consumers
+    # can start listening before a message is published.
+    producer.maybe_declare(producer.exchange)
+    return producer
new file mode 100644
--- /dev/null
+++ b/mozreviewpulse/tests/test_consumer.py
@@ -0,0 +1,196 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+Pulse Consumer tests.
+"""
+import pytest
+
+from mozreviewpulse.consumer import FIFOPulseConsumer
+
+
+class MessageCollector(list):
+    def clear(self):
+        del self[:]
+
+    def ack_callback(self, message, consumer=None, *args, **kwargs):
+        self.append(message)
+        message.ack()
+
+    def reject_callback(self, message, consumer=None, *args, **kwargs):
+        self.append(message)
+        message.reject()
+
+    def requeue_callback(self, message, consumer=None, *args, **kwargs):
+        self.append(message)
+        message.requeue()
+
+    def ignore_callback(self, message, consumer=None, *args, **kwargs):
+        self.append(message)
+        # Don't ack or reject the message.
+
+    def id_for(self, i):
+        return self[i].payload['id']
+
+
+def build_consumer(producer, queue, routing_key='testmessage',
+                   consumer_class=FIFOPulseConsumer, **kwargs):
+    """Construct a consumer for the provided producer.
+
+    The consumer should be created before producing any
+    messages.
+    """
+    msgs = MessageCollector()
+    consumer = consumer_class(
+        host=producer.connection.hostname,
+        port=producer.connection.port or 5672,
+        exchange=producer.exchange.name,
+        queue=queue,
+        routing_key=routing_key,
+        callback=msgs.ack_callback,
+        **kwargs)
+
+    # attempt to consume a single message, which will timeout,
+    # so that our queue is declared.
+    assert consumer.consume(limit=1) == 0
+    assert len(msgs) == 0
+    return consumer, msgs
+
+
+def produce_incrementing_messages(producer, n, routing_key='testmessage'):
+    """Publish n messages with incrementing ids on the producer."""
+    for i in xrange(n):
+        producer.publish({'id': i}, routing_key=routing_key, retry=True)
+
+
+def test_limit_messages_consumed(pulse_producer):
+    consumer, msgs = build_consumer(pulse_producer,
+                                    'limit_messages_consumed_queue')
+
+    # Produce a few messages that we can consume.
+    produce_incrementing_messages(pulse_producer, 3)
+
+    # We should only consume a single message if we set the limit
+    # to 1.
+    assert consumer.consume(limit=1) == 1
+    assert len(msgs) == 1
+
+    # When the limit is higher than the number of messages in
+    # the queue, consumption will stop when the queue is empty.
+    assert consumer.consume(limit=10) == 2
+    assert len(msgs) == 3
+
+
+def test_fifo_ack_removes_message(pulse_producer):
+    consumer, msgs = build_consumer(pulse_producer,
+                                    'fifo_ack_removes_message_queue')
+    produce_incrementing_messages(pulse_producer, 2)
+
+    consumer.callback = msgs.ack_callback
+    assert consumer.consume(limit=1) == 1
+    assert consumer.consume(limit=1) == 1
+    assert len(msgs) == 2
+
+    # The two messages consumed are not duplicates.
+    assert msgs.id_for(0) != msgs.id_for(1)
+    assert msgs.id_for(0) == 0
+    assert msgs.id_for(1) == 1
+
+
+def test_fifo_reject_removes_message(pulse_producer):
+    consumer, msgs = build_consumer(pulse_producer,
+                                    'fifo_reject_removes_message_queue')
+    produce_incrementing_messages(pulse_producer, 2)
+
+    consumer.callback = msgs.reject_callback
+    assert consumer.consume(limit=1) == 1
+    assert consumer.consume(limit=1) == 1
+    assert len(msgs) == 2
+
+    # The two messages consumed are not duplicates.
+    assert msgs.id_for(0) != msgs.id_for(1)
+    assert msgs.id_for(0) == 0
+    assert msgs.id_for(1) == 1
+
+
+def test_fifo_ignore_redelivers_message(pulse_producer):
+    consumer, msgs = build_consumer(pulse_producer,
+                                    'fifo_ignore_redelivers_message_queue')
+    produce_incrementing_messages(pulse_producer, 2)
+    consumer.callback = msgs.ignore_callback
+
+    # We should be able to consume many more messages
+    # than there are currently queued since the same
+    # message should be redelivered every time it
+    # is not acked.
+    assert consumer.consume(limit=20) == 20
+
+    # The same message should have been delivered
+    # every time.
+    for i, msg in enumerate(msgs):
+        assert msgs.id_for(i) == msgs.id_for(1)
+
+    # Clear the messages and consume one more as
+    # that's all we really need.
+    msgs.clear()
+    assert consumer.consume(limit=1) == 1
+    assert len(msgs) == 1
+
+    # Acking should actually take the message off
+    # the queue now.
+    consumer.callback = msgs.ack_callback
+    assert consumer.consume(limit=2) == 2
+    assert len(msgs) == 3
+
+    # The second message should match the previously
+    # ignored message, but since it was acked the
+    # third message should not be a duplicate.
+    assert msgs.id_for(1) == msgs.id_for(0)
+    assert msgs.id_for(1) != msgs.id_for(2)
+
+    # The queue should be empty now.
+    assert consumer.consume(limit=1) == 0
+
+
+@pytest.mark.skip(reason="Properly handling requeue would require touching "
+                         "private properties of the message. Ignoring the "
+                         "message can be used in practice instead.")
+def test_fifo_requeue_redelivers_message(pulse_producer):
+    consumer, msgs = build_consumer(pulse_producer,
+                                    'fifo_requeue_redelivers_message_queue')
+    produce_incrementing_messages(pulse_producer, 2)
+    consumer.callback = msgs.requeue_callback
+
+    # We should be able to consume many more messages
+    # than there are currently queued since the same
+    # message should be redelivered every time it
+    # is requeued.
+    assert consumer.consume(limit=20) == 20
+
+    # The same message should have been delivered
+    # every time.
+    for i, msg in enumerate(msgs):
+        assert msgs.id_for(i) == msgs.id_for(1)
+
+    # Clear the messages and consume one more as
+    # that's all we really need.
+    msgs.clear()
+    assert consumer.consume(limit=1) == 1
+    assert len(msgs) == 1
+
+    # Acking should actually take the message off
+    # the queue now.
+    consumer.callback = msgs.ack_callback
+    assert consumer.consume(limit=2) == 2
+    assert len(msgs) == 3
+
+    # The second message should match the previously
+    # requeued message, but since it was acked the
+    # third message should not be a duplicate.
+    assert msgs.id_for(1) == msgs.id_for(0)
+    assert msgs.id_for(1) != msgs.id_for(2)
+
+    # The queue should be empty now.
+    assert consumer.consume(limit=1) == 0
+