NOTA: Esto no pretende ser una respuesta directa a la pregunta. Más bien es un aumento a @TimothyLiu's answer, suponiendo que el usuario final está utilizando el paquete Boto
(también conocido como Boto2) no Boto3
. Este código es un "Boto-2-ización" de la llamada delete_messages
se hace referencia en his answer
A
Boto
(2) llamada para
delete_message_batch(messages_to_delete)
donde
messages_to_delete
es un objeto
dict
con clave: valor correspondiente a
id
:
receipt_handle
pares devuelve
AttributeError: 'dict' object has no attribute 'id'.
Parece que delete_message_batch
espera un objeto de clase Message
; copiar el Boto source for delete_message_batch
y permitirle usar un objeto que no sea Message
(ala boto3) también falla si está eliminando más de 10 "mensajes" a la vez. Entonces, tuve que usar la siguiente solución.
código ePrint desde here
from __future__ import print_function
import sys
from itertools import islice
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@static_vars(counter=0)
def take(n, iterable, reset=False):
"Return next n items of the iterable as same type"
if reset: take.counter = 0
take.counter += n
bob = islice(iterable, take.counter-n, take.counter)
if isinstance(iterable, dict): return dict(bob)
elif isinstance(iterable, list): return list(bob)
elif isinstance(iterable, tuple): return tuple(bob)
elif isinstance(iterable, set): return set(bob)
elif isinstance(iterable, file): return file(bob)
else: return bob
def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
"""
Deletes a list of messages from a queue in a single request.
:param cx: A boto connection object.
:param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
:param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
"""
listof10s = []
asSuc, asErr, acS, acE = "","",0,0
res = []
it = tuple(enumerate(messages))
params = {}
tenmsg = take(10,it,True)
while len(tenmsg)>0:
listof10s.append(tenmsg)
tenmsg = take(10,it)
while len(listof10s)>0:
tenmsg = listof10s.pop()
params.clear()
for i, msg in tenmsg: #enumerate(tenmsg):
prefix = 'DeleteMessageBatchRequestEntry'
numb = (i%10)+1
p_name = '%s.%i.Id' % (prefix, numb)
params[p_name] = msg.get('id')
p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
params[p_name] = msg.get('receipt_handle')
try:
go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
(sSuc,cS),(sErr,cE) = tup_result_messages(go)
if cS:
asSuc += ","+sSuc
acS += cS
if cE:
asErr += ","+sErr
acE += cE
except cx.ResponseError:
eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
except:
eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res
def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
if sSuc == "": sSuc="None"
if sErr == "": sErr="None"
if cS == expect: sSuc="All"
if cE == expect: sErr="All"
return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)
Realmente no puedo hacer eso, ya que los mensajes de SQS tienen un tiempo de espera de visibilidad, por lo que si por primera vez llegar 10 mensajes, luego realice un bucle varias veces, la próxima vez recibiré los mismos 10 mensajes desde que se agotó el tiempo de espera. Estoy pensando en usar 'dump()' pero tendré que leer el archivo después, parece tonto, ¿me falta algo? (Podría configurar el visibility_timeout por mucho tiempo, pero eso parece feo). –
@linker - dijiste que necesitas verificar 'n' mensajes específicos. ¿Esto significa que hay algunos criterios de coincidencia para los cuales está comparando cada mensaje? –
Lo siento si eso fue confuso, he actualizado mi publicación. –