2012-08-20 14 views
7

¿Alguien conoce una forma clara de obtener un comportamiento de LIFO cercano o incluso no cercano a FIFO (por ejemplo, aleatorio) desde multiprocessing.Queue?Manera limpia de obtener un comportamiento de LIFO cercano del multiproceso. ¿Qué? (o simplemente * no * near-FIFO)

Pregunta alternativa: ¿Alguien podría indicarme el código del hilo que gestiona la estructura de almacenamiento real detrás de multiprocessing.Queue? Parece que sería trivial proporcionar acceso a LIFO, pero me perdí en el agujero de conejo tratando de encontrarlo.

Notas:

  1. creo multiprocessing.Queuedoes not guarantee order. Multa. Pero está cerca de FIFO tan cerca de LIFO sería genial.
  2. Podría retirar todos los artículos actuales de la cola e invertir el orden antes de trabajar con ellos, pero prefiero evitar un kludge si es posible.

(EDIT) para aclarar: estoy haciendo una simulación depende de la CPU con multiprocessing y así no puede utilizar las colas especializadas de Queue. Como no he visto ninguna respuesta durante unos días, agregué la pregunta alternativa anterior.


En caso de ser un problema, a continuación hay una ligera evidencia de que multiprocessing.Queue es casi FIFO. Esto demuestra que en un caso simple (un solo hilo), es perfectamente FIFO en mi sistema:

import multiprocessing as mp 
import Queue 

q = mp.Queue() 

for i in xrange(1000): 
    q.put(i) 

deltas = [] 
while True: 
    try: 
     value1 = q.get(timeout=0.1) 
     value2 = q.get(timeout=0.1) 
     deltas.append(value2-value1) 
    except Queue.Empty: 
     break 

#positive deltas would indicate the numbers are coming out in increasing order 
min_delta, max_delta = min(deltas), max(deltas) 
avg_delta = sum(deltas)/len(deltas) 

print "min", min_delta 
print "max", max_delta 
print "avg", avg_delta 

impresiones: mínimo, máximo y promedio son exactamente 1 (FIFO perfecto)

+1

prueba inteligente ... – mgilson

+0

¿Solo necesita los datos de LIFO después de que se hayan realizado todas las adiciones o desea obtener los últimos datos mientras se agregan nuevos valores? Si el primero, creo que invertir el contenido de la cola es más fácil. Si "vives" el acceso a LIFO, probablemente necesitarás escribir tu propia estructura de datos usando las primitivas de memoria compartida del módulo 'multiprocesamiento'. – Blckknght

+0

@Blckknght Sí, si puedo esperar hasta que todo esté allí, entonces es bastante sencillo (opción 2) pero es una simulación en curso en la que quiero que la cola actúe aproximadamente como una pila. Eché un vistazo rápido a las primitivas, con la esperanza de personalizar el hilo de gestión de colas, pero no pude encontrar cara o cruz. Investigar ese es mi siguiente paso si no puedo encontrar una manera fácil. ¡Gracias por el comentario! – KobeJohn

Respuesta

2

He mirado sobre la clase de cola que vive en Lib/multiprocessing/queues.py en mi instalación de Python (Python 2.7, pero nada obvio es diferente en la versión de Python 3.2 que revisé brevemente). He aquí cómo entiendo que funciona:

Hay dos conjuntos de objetos que mantiene el objeto Queue. Un conjunto son primativos seguros multiproceso que son compartidos por todos los procesos. Los demás son creados y utilizados por separado por cada proceso.

El proceso transversal objetos se configuran en el método __init__:

  1. Un objeto Pipe, que es dos extremos se guardan como self._reader y self._writer.
  2. Un objeto BoundedSemaphore, que cuenta (y opcionalmente limita) la cantidad de objetos en la cola.
  3. Un objeto Lock para leer el Tubo, y en plataformas que no son de Windows, otra para escribir.(Supongo que esto se debe escribir en un tubo es inherentemente multiproceso de fallos en Windows.)

Los objetos por proceso se establecen en los métodos _after_fork y _start_thread:

  1. Un objeto collections.deque utilizado para almacenar las escrituras en el Pipe.
  2. A threading.condition objeto utilizado para indicar cuando el búfer no está vacío.
  3. Un objeto threading.Thread que hace la escritura real. Se crea de forma perezosa, por lo que no existirá hasta que se haya solicitado al menos una escritura en la Cola en un proceso determinado.
  4. Varios Finalize objetos que limpian cosas cuando termina el proceso.

A get de la cola es bastante simple. Adquiere el bloqueo de lectura, disminuye el semáforo y toma un objeto del extremo de lectura del conducto.

A put es más complicado. Utiliza múltiples hilos. La persona que llama al put toma el bloqueo de la condición, luego agrega su objeto al búfer y señala la condición antes de desbloquearlo. También incrementa el semáforo e inicia el hilo del escritor si todavía no se está ejecutando.

El hilo del escritor gira por siempre (hasta que se cancele) en el método . Si el búfer está vacío, espera en la condición notempty. Luego toma un elemento del buffer, adquiere el bloqueo de escritura (si existe) y escribe el elemento en el Pipe.


Entonces, teniendo en cuenta todo eso, ¿puede modificarlo para obtener una cola LIFO? No parece fácil. Las tuberías son intrínsecamente objetos FIFO, y aunque la cola no puede garantizar el comportamiento FIFO en general (debido a la naturaleza asíncrona de las escrituras de múltiples procesos) siempre va a ser principalmente FIFO.

Si solo tiene un solo consumidor, puede get objetos de la cola y agregarlos a su propia pila de proceso local. Sería más difícil hacer una pila de varios consumidores, aunque con la memoria compartida, una pila de tamaño limitado no sería demasiado difícil. Necesitará un bloqueo, un par de condiciones (para bloquear/señalizar en estados completos y vacíos), un valor entero compartido (para el número de valores retenidos) y una matriz compartida de un tipo apropiado (para los valores mismos).

+0

Cuando tenga tiempo para trabajar en esto, seguiré su excelente recorrido por el mp.Queue y veré lo que puedo ver.Esperaba que la información se guardara en algún objeto que fuera fácil de usar, pero veo que es una tubería como dijiste. En cualquier caso, supongo que su explicación aquí es la mejor en Internet sobre cómo funciona mp.Queue. Muchas gracias. – KobeJohn

1

Hay a LIFO queue en el paquete Queue (cola en Python 3). Esto no está expuesto en los módulos multiprocesamiento o multiprocesamiento.queues.

Reemplazando su línea q = mp.Queue() con y ejecutando impresiones: mín., Máx. Y promedio exactamente igual a -1.

(También creo que siempre debe obtener exactamente orden FIFO/LIFO al conseguir artículos de un solo hilo.)

+0

Gracias por la sugerencia. Desafortunadamente estoy haciendo una simulación de CPU que realmente usa multiprocesamiento. A mi leal saber y entender, la cola estándar no puede manejar procesos. Actualizaré mi pregunta con algunos detalles sobre eso. – KobeJohn

Cuestiones relacionadas