2012-04-16 96 views
9

Estoy trabajando en una aplicación cuyo flujo de trabajo se gestiona pasando mensajes en SQS, usando boto.¿Cómo obtener todos los mensajes en la cola de Amazon SQS utilizando la biblioteca de boto en Python?

Mi cola SQS está creciendo gradualmente, y no tengo forma de comprobar cuántos elementos se supone que contiene.

Ahora tengo un daemon que periódicamente sondea la cola y compruebo si tengo un conjunto de elementos de tamaño fijo. Por ejemplo, considere el siguiente "cola":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

Ahora quiero comprobar si tengo, "msg2_comp1" y "msg3_comp1" en la cola juntos en algún momento en el tiempo "msg1_comp1", pero yo no' saber el tamaño de la cola.

Después de mirar a través de la API, parece que se puede obtener ya sea sólo el 1 elemento, o de un número fijo de elementos en la cola, pero no todos:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

Una sugerencia propuesta en las respuestas sería obtener, por ejemplo, 10 mensajes en un bucle hasta que no reciba nada, pero los mensajes en SQS tienen un tiempo de espera de visibilidad, lo que significa que si sondear elementos de la cola, no se eliminarán realmente, solo serán invisibles durante un breve período de tiempo.

¿Hay una manera simple de obtener todos los mensajes en la cola, sin saber cuántos hay?

Respuesta

13

poner su llamada a q.get_messages(n) dentro bucle while:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

Además, dump won't support more than 10 messages ya sea:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

Realmente no puedo hacer eso, ya que los mensajes de SQS tienen un tiempo de espera de visibilidad, por lo que si por primera vez llegar 10 mensajes, luego realice un bucle varias veces, la próxima vez recibiré los mismos 10 mensajes desde que se agotó el tiempo de espera. Estoy pensando en usar 'dump()' pero tendré que leer el archivo después, parece tonto, ¿me falta algo? (Podría configurar el visibility_timeout por mucho tiempo, pero eso parece feo). –

+0

@linker - dijiste que necesitas verificar 'n' mensajes específicos. ¿Esto significa que hay algunos criterios de coincidencia para los cuales está comparando cada mensaje? –

+0

Lo siento si eso fue confuso, he actualizado mi publicación. –

5

Mi entendimiento es que la naturaleza distribuida del servicio SQS más o menos hace que su diseño inviable. Cada vez que llamas a get_messages estás hablando con un conjunto diferente de servidores, que tendrán algunos, pero no todos tus mensajes. Por lo tanto, no es posible 'verificar de vez en cuando' para establecer si un determinado grupo de mensajes está listo, y luego simplemente aceptarlos.

Lo que necesita hacer es sondear continuamente, tomar todos los mensajes a medida que llegan y almacenarlos localmente en sus propias estructuras de datos. Después de cada búsqueda exitosa, puede verificar sus estructuras de datos para ver si se ha recopilado un conjunto completo de mensajes.

Tenga en cuenta que los mensajes se llegar fuera de orden, y algunos mensajes serán ser entregado dos veces, como eliminaciones tienen en propagarse a todos los servidores de SQS, pero subsiguientes peticiones GET veces superaron a los mensajes de borrado.

0

Algo como el siguiente código debería hacer el truco. Lo siento, está en C#, pero no debería ser difícil convertirlo a Python. El diccionario se usa para eliminar los duplicados.

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

He estado trabajando con colas de AWS SQS para proporcionar notificaciones instantáneas, así que tienen que ser de procesar todos los mensajes en tiempo real. El siguiente código le ayudará a delecuar de manera eficiente (todos) los mensajes y manejar cualquier error al eliminarlos.

Nota: para eliminar mensajes de la cola, debe eliminarlos.Estoy usando el boto3 actualizado AWS SDK Python, biblioteca JSON, y los siguientes valores por defecto:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

Una adaptación para los paquetes v2 'Boto' a" backport "la función' delete_messages' de 'Boto3' es [aquí] (http://stackoverflow.com/a/40638174/4228193). El 'Boto' incorporado (2)' delete_message_batch' tiene una limitación de 10 mensajes Y requiere objetos completos de la clase 'Mensaje', en lugar de solo' ID' y 'ReceiptHandles' en un objeto. – mpag

0

NOTA: Esto no pretende ser una respuesta directa a la pregunta. Más bien es un aumento a @TimothyLiu's answer, suponiendo que el usuario final está utilizando el paquete Boto (también conocido como Boto2) no Boto3. Este código es un "Boto-2-ización" de la llamada delete_messages se hace referencia en his answer


A Boto (2) llamada para delete_message_batch(messages_to_delete) donde messages_to_delete es un objeto dict con clave: valor correspondiente a id: receipt_handle pares devuelve

AttributeError: 'dict' object has no attribute 'id'.

Parece que delete_message_batch espera un objeto de clase Message; copiar el Boto source for delete_message_batch y permitirle usar un objeto que no sea Message (ala boto3) también falla si está eliminando más de 10 "mensajes" a la vez. Entonces, tuve que usar la siguiente solución.

código ePrint desde here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

que ejecutar esta tarea programada en un

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
Cuestiones relacionadas