WIP Add SQS library (
bug 1288282)
Add a library for listening to an SQS queue, processing messages, and returning
errors.
MozReview-Commit-ID: BcyYtVEJE5T
new file mode 100644
--- /dev/null
+++ b/vcssync/mozvcssync/sqs.py
@@ -0,0 +1,144 @@
+# Portions derived from python-sqs-listener
+# https://github.com/jegesh/python-sqs-listener
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, print_function
+
+import json
+import logging
+import sys
+import time
+from abc import ABCMeta, abstractmethod
+
+import boto3
+import botocore
+
+logger = logging.getLogger('sqs')
+
+
+class SqsFatalException(Exception):
+ """
+ If a handler raises this exception, the message being processed will
+ be removed from the queue.
+ """
+
+
+class SqsListener(object):
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def handle_message(self, body):
+ """
+ Implement this method to do something with the SQS message contents
+ """
+
+ def __init__(self, region, queue, error_queue, **kwargs):
+ self._region_name = region
+ self._queue_name = queue
+ self._error_queue_name = error_queue
+
+ self._queue_url = None
+ self._error_queue_url = None
+
+ self._poll_interval = kwargs.get('interval', 60)
+ self._force_delete = kwargs.get('force_delete', False)
+
+ self._client = boto3.client('sqs', region_name=self._region_name)
+ self._find_queue_urls()
+
+ def _find_queue_urls(self):
+ queues = self._client.list_queues()
+ if 'QueueUrls' in queues:
+ for q in queues['QueueUrls']:
+ qname = q.split('/')[-1]
+ if qname == self._queue_name:
+ self._queue_url = q
+ elif qname == self._error_queue_name:
+ self._error_queue_url = q
+
+ if not self._queue_url:
+ raise Exception('queue "%s" not found' % self._queue_name)
+ if not self._error_queue_url:
+ raise Exception('queue "%s" not found' % self._error_queue_name)
+
+ def _delete_message(self, message):
+ logger.info('deleting message #%s' % message['MessageId'])
+ try:
+ self._client.delete_message(
+ QueueUrl=self._queue_url,
+ ReceiptHandle=message['ReceiptHandle']
+ )
+ return True
+ except botocore.exceptions.ClientError as e:
+ if 'The receipt handle has expired' in str(e):
+ logger.error('failed to delete message #%s: %s' %
+ (message['MessageId'],
+ "processing time exceeded queue's 'Visibility "
+ "Timeout' resulting an an expired receipt "
+ "handle."))
+ else:
+ logger.error('failed to delete message #%s: %s'
+ % (message['MessageId'], e))
+ return False
+
+ def _process_message(self, message):
+ try:
+
+ try:
+ message_body = json.loads(message['Body'])
+ except ValueError:
+ raise SqsFatalException('malformed JSON')
+
+ if self._force_delete:
+ self._delete_message(message)
+ self.handle_message(message_body)
+
+ else:
+ self.handle_message(message_body)
+ self._delete_message(message)
+
+ except Exception as ex:
+ logger.exception(ex)
+ exc_type, exc_obj, exc_tb = sys.exc_info()
+
+ delete_message = isinstance(ex, SqsFatalException)
+ if delete_message:
+ delete_message = self._delete_message(message)
+
+ logger.info('pushing exception to error queue')
+ try:
+ self._client.send_message(
+ QueueUrl=self._error_queue_url,
+ MessageBody=json.dumps({
+ 'exception_type': str(exc_type),
+ 'error_message': str(ex.args),
+ 'message_id': message['MessageId'],
+ 'deleted': delete_message,
+ }),
+ MessageGroupId='error',
+ MessageDeduplicationId=message['MessageId'],
+ )
+ except botocore.exceptions.ClientError as e:
+ logger.error('failed to send error message to #%s: %s'
+ % (self._error_queue_url, e))
+
+ def listen(self):
+ logger.info('listening to queue %s, error queue %s'
+ % (self._queue_url, self._error_queue_url))
+
+ while True:
+ messages = self._client.receive_message(QueueUrl=self._queue_url)
+ for message in messages.get('Messages', []):
+ logger.info('processing message #%s' % message['MessageId'])
+ self._process_message(message)
+
+ time.sleep(self._poll_interval)