WIP Add SQS library (bug 1288282) draft
authorbyron jones <glob@mozilla.com>
Wed, 27 Sep 2017 14:58:58 +0800
changeset 11717 c242cadfee5601cd9ff82f272f6f14e7767f8b5b
parent 11715 866e7cfd28fed943ea08f9066058004ec7f96787
child 11718 d16e27b6766c648d9e66764fd6ed01622a552df1
push id1805
push userbjones@mozilla.com
push dateWed, 27 Sep 2017 07:33:57 +0000
bugs1288282
WIP Add SQS library (bug 1288282) Add a library for listening to an SQS queue, processing messages, and returning errors. MozReview-Commit-ID: BcyYtVEJE5T
vcssync/mozvcssync/sqs.py
new file mode 100644
--- /dev/null
+++ b/vcssync/mozvcssync/sqs.py
@@ -0,0 +1,144 @@
+# Portions derived from python-sqs-listener
+# https://github.com/jegesh/python-sqs-listener
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, print_function
+
+import json
+import logging
+import sys
+import time
+from abc import ABCMeta, abstractmethod
+
+import boto3
+import botocore
+
+logger = logging.getLogger('sqs')
+
+
+class SqsFatalException(Exception):
+    """
+    If a handler raises this exception, the message being processed will
+    be removed from the queue.
+    """
+
+
+class SqsListener(object):
+    __metaclass__ = ABCMeta
+
+    @abstractmethod
+    def handle_message(self, body):
+        """
+        Implement this method to do something with the SQS message contents
+        """
+
+    def __init__(self, region, queue, error_queue, **kwargs):
+        self._region_name = region
+        self._queue_name = queue
+        self._error_queue_name = error_queue
+
+        self._queue_url = None
+        self._error_queue_url = None
+
+        self._poll_interval = kwargs.get('interval', 60)
+        self._force_delete = kwargs.get('force_delete', False)
+
+        self._client = boto3.client('sqs', region_name=self._region_name)
+        self._find_queue_urls()
+
+    def _find_queue_urls(self):
+        queues = self._client.list_queues()
+        if 'QueueUrls' in queues:
+            for q in queues['QueueUrls']:
+                qname = q.split('/')[-1]
+                if qname == self._queue_name:
+                    self._queue_url = q
+                elif qname == self._error_queue_name:
+                    self._error_queue_url = q
+
+        if not self._queue_url:
+            raise Exception('queue "%s" not found' % self._queue_name)
+        if not self._error_queue_url:
+            raise Exception('queue "%s" not found' % self._error_queue_name)
+
+    def _delete_message(self, message):
+        logger.info('deleting message #%s' % message['MessageId'])
+        try:
+            self._client.delete_message(
+                QueueUrl=self._queue_url,
+                ReceiptHandle=message['ReceiptHandle']
+            )
+            return True
+        except botocore.exceptions.ClientError as e:
+            if 'The receipt handle has expired' in str(e):
+                logger.error('failed to delete message #%s: %s' %
+                             (message['MessageId'],
+                              "processing time exceeded queue's 'Visibility "
+                              "Timeout' resulting an an expired receipt "
+                              "handle."))
+            else:
+                logger.error('failed to delete message #%s: %s'
+                             % (message['MessageId'], e))
+            return False
+
+    def _process_message(self, message):
+        try:
+
+            try:
+                message_body = json.loads(message['Body'])
+            except ValueError:
+                raise SqsFatalException('malformed JSON')
+
+            if self._force_delete:
+                self._delete_message(message)
+                self.handle_message(message_body)
+
+            else:
+                self.handle_message(message_body)
+                self._delete_message(message)
+
+        except Exception as ex:
+            logger.exception(ex)
+            exc_type, exc_obj, exc_tb = sys.exc_info()
+
+            delete_message = isinstance(ex, SqsFatalException)
+            if delete_message:
+                delete_message = self._delete_message(message)
+
+            logger.info('pushing exception to error queue')
+            try:
+                self._client.send_message(
+                    QueueUrl=self._error_queue_url,
+                    MessageBody=json.dumps({
+                        'exception_type': str(exc_type),
+                        'error_message': str(ex.args),
+                        'message_id': message['MessageId'],
+                        'deleted': delete_message,
+                    }),
+                    MessageGroupId='error',
+                    MessageDeduplicationId=message['MessageId'],
+                )
+            except botocore.exceptions.ClientError as e:
+                logger.error('failed to send error message to #%s: %s'
+                             % (self._error_queue_url, e))
+
+    def listen(self):
+        logger.info('listening to queue %s, error queue %s'
+                    % (self._queue_url, self._error_queue_url))
+
+        while True:
+            messages = self._client.receive_message(QueueUrl=self._queue_url)
+            for message in messages.get('Messages', []):
+                logger.info('processing message #%s' % message['MessageId'])
+                self._process_message(message)
+
+            time.sleep(self._poll_interval)