Después de Pika timed received example, me gustaría que un cliente maneje más solicitudes concurrentes. Mi pregunta es si se podría llamar de alguna manera al handle_delivery cada vez que se recibe un nuevo mensaje y no se espera la devolución del handle_delivery anterior?Tratamiento asincrónico de mensajes por el cliente Pika RabbitMQ
5
A
Respuesta
2
Parece que la llamada a handle_delivery
está bloqueando, pero podría hacer que agregue un controlador secundario al bucle de evento de E/S usando add_timeout
. Creo que esto es lo que estás buscando hacer:
"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""
import sys
import time
from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters
connection = None
channel = None
def on_connected(connection):
print "timed_receive: Connected to RabbitMQ"
connection.channel(on_channel_open)
def on_channel_open(channel_):
global channel
channel = channel_
print "timed_receive: Received our Channel"
channel.queue_declare(queue="test", durable=True,
exclusive=False, auto_delete=False,
callback=on_queue_declared)
class TimingHandler(object):
count = 0
last_count = 0
def __init__(self, delay=0):
self.start_time = time.time()
self.delay = delay
def handle_delivery(self, channel, method, header, body):
connection.add_timeout(self.delay, self)
def __call__(self):
self.count += 1
if not self.count % 1000:
now = time.time()
duration = now - self.start_time
sent = self.count - self.last_count
rate = sent/duration
self.last_count = self.count
self.start_time = now
print "timed_receive: %i Messages Received, %.4f per second" %\
(self.count, rate)
def on_queue_declared(frame):
print "timed_receive: Queue Declared"
channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)
if __name__ == '__main__':
# Connect to RabbitMQ
host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
connection = SelectConnection(ConnectionParameters(host),
on_connected)
# Loop until CTRL-C
try:
# Start our blocking loop
connection.ioloop.start()
except KeyboardInterrupt:
# Close the connection
connection.close()
# Loop until the connection is closed
connection.ioloop.start()
Cuestiones relacionadas
- 1. RabbitMQ, Pika y estrategia de reconexión
- 2. Pika + RabbitMQ: establecer basic_qos para prefetch = 1 todavía parece consumir todos los mensajes en la cola
- 3. Mensajes de reordenación de RabbitMQ
- 4. Clojure manejo de mensajes/asincrónico, multiproceso
- 5. Verificar el tamaño de cola de RabbitMQ desde el cliente
- 6. Los consumidores de mensajes de RabbitMQ dejan de consumir mensajes
- 7. Uso de Aplery con mensajes RabbitMQ existentes
- 8. ¿Por qué RabbitMQ no persiste mensajes en una cola duradera?
- 9. ¿Cuándo usa rabbitmq la contrapresión de tcp?
- 10. RabbitMQ: los mensajes permanecen "No reconocidos"
- 11. ¿qué cliente de rubí es el más estable para rabbitmq?
- 12. ¿Hay alguna forma de enumerar las colas en un rabbitmq vía pika?
- 13. Mongoose asincrónico por defecto
- 14. Recuperar mensajes de la (s) cola (s) de RabbitMQ
- 15. RabbitMQ-- recuperación selectiva de mensajes de una cola
- 16. Bloqueos y mensajes de recuperación de lotes con RabbitMq
- 17. Uso de EasyNetQ con RabbitMQ para publicar y recibir mensajes
- 18. Tratamiento de errores del servidor SQL: excepciones y el contrato cliente-base de datos
- 19. ¿Por qué mis canales RabbitMQ siguen cerrando?
- 20. Interceptar mensajes en un cliente de WCF
- 21. Uso de msmq para el registro asincrónico
- 22. Tratamiento de nombres mongoles
- 23. Uso de Tornado con Pika para monitoreo de colas asíncronas
- 24. Maximice el rendimiento con RabbitMQ
- 25. RabbitMQ + Memory Límites
- 26. RabbitMQ velocidades de transferencia aceleran?
- 27. ¿Cómo puedo verificar si existe o no una cola de mensajes RabbitMQ?
- 28. RabbitMQ negarse a iniciar RabbitMQ
- 29. ¿Por qué usar Apio en lugar de RabbitMQ?
- 30. Qué forma de conexión usar con pika