2012-04-16 1 views
9

Я работаю над приложением, чей рабочий процесс управляется передачей сообщений в SQS, используя boto.Как получить все сообщения в очереди Amazon SQS с помощью библиотеки boto в Python?

Моя очередь SQS постепенно растет, и у меня нет возможности проверить, сколько элементов она должна содержать.

Теперь у меня есть демон, который периодически обследует очередь и проверяет, есть ли у меня набор элементов фиксированного размера. Например, рассмотрим следующую «очередь»:

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

Теперь я хочу, чтобы проверить, есть ли у меня «msg1_comp1», «msg2_comp1» и «msg3_comp1» в очереди вместе в какой-то момент времени, но я не» t знать размер очереди.

После просмотра через API, кажется, вы можете получить только один элемент, или фиксированное количество элементов в очереди, но не все:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

Предложение предложено в ответах бы на получите, например, 10 сообщений в цикле до тех пор, пока я ничего не получу, но сообщения в SQS имеют тайм-аут видимости, а это означает, что если я опросу элементов из очереди, они не будут действительно удалены, они будут невидимыми только на короткий период времени.

Есть ли простой способ получить все сообщения в очереди, не зная, сколько их есть?

ответ

13

Поместите свой вызов q.get_messages(n) внутри во время цикла:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

Кроме того, dump won't support more than 10 messages либо:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

Я не могу сделать это, так как сообщения в SQS имеют тайм-аут видимости, так что если я сначала получить 10 сообщений, затем цикл несколько раз, в следующий раз я могу получать те же 10 сообщений, что и тайм-аут. Я думаю об использовании 'dump()', но мне нужно будет прочитать файл после, что кажется глупым, я что-то упускаю? (Я могу установить visibility_timeout на очень долгое время, но это кажется уродливым). –

+0

@linker - вы сказали, что вам нужно проверить «n» конкретные сообщения. означает ли это, что есть некоторые критерии соответствия, по которым вы сравниваете каждое сообщение? –

+0

Извините, если это было непонятно, я обновил свой пост. –

5

Я понимаю, что распределенный характер службы SQS довольно много делает ваш дизайн неработоспособной. Каждый раз, когда вы вызываете get_messages, вы говорите с другим набором серверов, на котором будут некоторые, но не все ваши сообщения. Таким образом, невозможно «время от времени проверять», чтобы установить, готова ли определенная группа сообщений, а затем просто принять их.

Что нужно сделать, это проводить опрос непрерывно, принимать все сообщения по мере их поступления и хранить их локально в своих собственных структурах данных. После каждой успешной выборки вы можете проверить свои структуры данных, чтобы узнать, был ли собран полный набор сообщений.

Имейте в виду, что сообщения будет прибыть из строя, и некоторые сообщения будут быть доставлены в два раза, а удалений должны распространяться на все серверы SQS, но последующие запросы получают иногда выбивать удалять сообщения.

0

Что-то вроде приведенного ниже кода должно делать трюк. Извините, что он находится на C#, но преобразовать его в python не сложно. Словарь используется для отсечения дубликатов.

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

Я работал с очередями AWS SQS обеспечить мгновенные уведомления, поэтому мне нужно обрабатывать все сообщения в режиме реального времени. Следующий код поможет вам эффективно деактивировать (все) сообщения и обрабатывать любые ошибки при удалении.

Примечание: для удаления сообщений из очереди вам необходимо их удалить.Я использую обновленный boto3 AWS SDK питона, библиотеку JSON, а следующие значения по умолчанию:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

Адаптация для пакетов v2 'Boto' для« резервного копирования »функции' delete_messages' из 'Boto3' является [здесь] (http://stackoverflow.com/a/40638174/4228193). Встроенная функция 'Boto' (2)' delete_message_batch' имеет ограничение в 10 сообщений и требует полных объектов класса 'Message', а не только' ID' и 'ReceiptHandles' в объекте. – mpag

0

ПРИМЕЧАНИЕ: Это не предназначено в качестве прямого ответа на вопрос. Скорее это увеличение до @TimothyLiu's answer, предполагая, что конечный пользователь использует пакет Boto (aka Boto2) не Boto3. Этот код является «Boto-2-зации» от delete_messages вызова, указанного в his answer


Boto (2) призыв к delete_message_batch(messages_to_delete) где messages_to_delete является dict объект с ключом: значение, соответствующее id: receipt_handle пар возвращается

AttributeError: 'dict' object has no attribute 'id'.

Похоже, delete_message_batch ожидает Message класс объекта; копирование Boto source for delete_message_batch и возможность использования объекта не Message (ala boto3) также не удается, если вы удаляете более 10 «сообщений» за раз. Таким образом, мне пришлось использовать следующий подход.

Eprint код из here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

я исполню это в cronjob

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
Смежные вопросы