6

Estoy tratando de hacer que un archivo como el objeto que se debe asignar a sys.stdout/sys.stderr durante la prueba proporcione una salida determinista. No está destinado a ser rápido, solo confiable. Lo que tengo hasta ahora casi funciona, pero necesito algo de ayuda para deshacerme de los últimos pocos errores de mayúsculas y minúsculas.Multiproceso de Python: sincronizar el objeto similar a un archivo

Aquí está mi implementación actual.

try: 
    from cStringIO import StringIO 
except ImportError: 
    from StringIO import StringIO 

from os import getpid 
class MultiProcessFile(object): 
    """ 
    helper for testing multiprocessing 

    multiprocessing poses a problem for doctests, since the strategy 
    of replacing sys.stdout/stderr with file-like objects then 
    inspecting the results won't work: the child processes will 
    write to the objects, but the data will not be reflected 
    in the parent doctest-ing process. 

    The solution is to create file-like objects which will interact with 
    multiprocessing in a more desirable way. 

    All processes can write to this object, but only the creator can read. 
    This allows the testing system to see a unified picture of I/O. 
    """ 
    def __init__(self): 
     # per advice at: 
     # http://docs.python.org/library/multiprocessing.html#all-platforms 
     from multiprocessing import Queue 
     self.__master = getpid() 
     self.__queue = Queue() 
     self.__buffer = StringIO() 
     self.softspace = 0 

    def buffer(self): 
     if getpid() != self.__master: 
      return 

     from Queue import Empty 
     from collections import defaultdict 
     cache = defaultdict(str) 
     while True: 
      try: 
       pid, data = self.__queue.get_nowait() 
      except Empty: 
       break 
      cache[pid] += data 
     for pid in sorted(cache): 
      self.__buffer.write('%s wrote: %r\n' % (pid, cache[pid])) 
    def write(self, data): 
     self.__queue.put((getpid(), data)) 
    def __iter__(self): 
     "getattr doesn't work for iter()" 
     self.buffer() 
     return self.__buffer 
    def getvalue(self): 
     self.buffer() 
     return self.__buffer.getvalue() 
    def flush(self): 
     "meaningless" 
     pass 

... y una escritura de la prueba rápida:

#!/usr/bin/python2.6 

from multiprocessing import Process 
from mpfile import MultiProcessFile 

def printer(msg): 
    print msg 

processes = [] 
for i in range(20): 
    processes.append(Process(target=printer, args=(i,), name='printer')) 

print 'START' 
import sys 
buffer = MultiProcessFile() 
sys.stdout = buffer 

for p in processes: 
    p.start() 
for p in processes: 
    p.join() 

for i in range(20): 
    print i, 
print 

sys.stdout = sys.__stdout__ 
sys.stderr = sys.__stderr__ 
print 
print 'DONE' 
print 
buffer.buffer() 
print buffer.getvalue() 

Esto funciona a la perfección el 95% del tiempo, pero tiene tres problemas borde de los casos. Tengo que ejecutar el script de prueba en un bucle while para reproducir estos.

  1. El 3% de las veces, la salida del proceso principal no se refleja por completo. Supongo que esto se debe a que los datos se están consumiendo antes de que el hilo Queue-flushing pueda ponerse al día. No he pensado en una forma de esperar el hilo sin bloqueo.
  2. 0,5% de las veces, hay un rastreo de la aplicación multiprocess.Queue
  3. 0,01% de las veces, los PID envolver alrededor, y por lo tanto la clasificación por PID da el orden equivocado.

En el peor de los casos (odds: uno de cada 70 millones), la salida se vería así:

START 

DONE 

302 wrote: '19\n' 
32731 wrote: '0 1 2 3 4 5 6 7 8 ' 
32732 wrote: '0\n' 
32734 wrote: '1\n' 
32735 wrote: '2\n' 
32736 wrote: '3\n' 
32737 wrote: '4\n' 
32738 wrote: '5\n' 
32743 wrote: '6\n' 
32744 wrote: '7\n' 
32745 wrote: '8\n' 
32749 wrote: '9\n' 
32751 wrote: '10\n' 
32752 wrote: '11\n' 
32753 wrote: '12\n' 
32754 wrote: '13\n' 
32756 wrote: '14\n' 
32757 wrote: '15\n' 
32759 wrote: '16\n' 
32760 wrote: '17\n' 
32761 wrote: '18\n' 

Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner 
    File "/usr/lib/python2.6/threading.py", line 484, in run 
     File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed 
<type 'exceptions.TypeError'>: 'NoneType' object is not callable 

En python2.7 la excepción es ligeramente diferente:

Exception in thread QueueFeederThread (most likely raised during interpreter shutdown): 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed 
<type 'exceptions.IOError'>: [Errno 32] Broken pipe 

¿Cómo me deshago de estos casos extremos?

+1

¿Cuál es la pregunta real que estás haciendo? ¿Por qué estás recibiendo esas excepciones? ¿Por qué están ocurriendo cada uno de los casos extremos? –

+0

@Daniel: cómo deshacerse de esos tres problemas. Creo que me he dejado más claro al agregar una oración a la introducción. ¿Eso ayuda? – bukzor

Respuesta

8

La solución viene en dos partes. He ejecutado con éxito el programa de prueba 200 mil veces sin ningún cambio en la salida.

La parte fácil era usar multiprocesamiento.corrent_process() ._ identity para ordenar los mensajes. Esto no forma parte de la API publicada, pero es un identificador único y determinista de cada proceso. Esto solucionó el problema con los PID que se ajustaban y daban un orden incorrecto de salida.

La otra parte de la solución fue usar multiprocesamiento.Manager(). Queue() en lugar de multiprocesamiento.Queue. Esto soluciona el problema n. ° 2 anterior porque el administrador vive en un Proceso separado y, por lo tanto, evita algunos de los casos especiales incorrectos al utilizar una cola del proceso de propiedad. # 3 es fijo porque la cola está completamente agotada y el hilo del alimentador muere de forma natural antes de que Python comience a cerrarse y cierre el stdin.

+4

multiprocesamiento.Manager(). Queue() en lugar del multiprocesamiento.Queue eliminó el error ": [Errno 32] Broken pipe" en Python 2.7 para mí –

+0

@JoshuaRichardson Usando un 'multiprocesamiento .Manager(). Queue() 'también lo resuelve para mí. Pero mis pruebas toman aproximadamente 7 veces más que con 'mutliprocessing.queues.Queue()'. – Bengt

+0

@Bengt: espero que no esté haciendo un gerente para cada cola. Solo necesitas uno. ¿Podría mostrarnos un punto de referencia mínimo? – bukzor

0

He encontrado muchos menos errores de multiprocessing con Python 2.7 que con Python 2.6. Habiendo dicho esto, la solución que utilicé para evitar el problema "Exception in thread QueueFeederThread" es sleep momentáneamente, posiblemente para 0.01s, en cada proceso en el que se usa Queue. Es cierto que el uso de sleep no es deseable o incluso confiable, pero se observó que la duración especificada funcionaba suficientemente bien en la práctica para mí. También puedes probar 0.1s.

+2

Narcolepsy nunca es una solución confiable. – bukzor

Cuestiones relacionadas