Estoy tratando de hacer que un archivo como el objeto que se debe asignar a sys.stdout/sys.stderr durante la prueba proporcione una salida determinista. No está destinado a ser rápido, solo confiable. Lo que tengo hasta ahora casi funciona, pero necesito algo de ayuda para deshacerme de los últimos pocos errores de mayúsculas y minúsculas.Multiproceso de Python: sincronizar el objeto similar a un archivo
Aquí está mi implementación actual.
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from os import getpid
class MultiProcessFile(object):
"""
helper for testing multiprocessing
multiprocessing poses a problem for doctests, since the strategy
of replacing sys.stdout/stderr with file-like objects then
inspecting the results won't work: the child processes will
write to the objects, but the data will not be reflected
in the parent doctest-ing process.
The solution is to create file-like objects which will interact with
multiprocessing in a more desirable way.
All processes can write to this object, but only the creator can read.
This allows the testing system to see a unified picture of I/O.
"""
def __init__(self):
# per advice at:
# http://docs.python.org/library/multiprocessing.html#all-platforms
from multiprocessing import Queue
self.__master = getpid()
self.__queue = Queue()
self.__buffer = StringIO()
self.softspace = 0
def buffer(self):
if getpid() != self.__master:
return
from Queue import Empty
from collections import defaultdict
cache = defaultdict(str)
while True:
try:
pid, data = self.__queue.get_nowait()
except Empty:
break
cache[pid] += data
for pid in sorted(cache):
self.__buffer.write('%s wrote: %r\n' % (pid, cache[pid]))
def write(self, data):
self.__queue.put((getpid(), data))
def __iter__(self):
"getattr doesn't work for iter()"
self.buffer()
return self.__buffer
def getvalue(self):
self.buffer()
return self.__buffer.getvalue()
def flush(self):
"meaningless"
pass
... y una escritura de la prueba rápida:
#!/usr/bin/python2.6
from multiprocessing import Process
from mpfile import MultiProcessFile
def printer(msg):
print msg
processes = []
for i in range(20):
processes.append(Process(target=printer, args=(i,), name='printer'))
print 'START'
import sys
buffer = MultiProcessFile()
sys.stdout = buffer
for p in processes:
p.start()
for p in processes:
p.join()
for i in range(20):
print i,
print
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
print
print 'DONE'
print
buffer.buffer()
print buffer.getvalue()
Esto funciona a la perfección el 95% del tiempo, pero tiene tres problemas borde de los casos. Tengo que ejecutar el script de prueba en un bucle while para reproducir estos.
- El 3% de las veces, la salida del proceso principal no se refleja por completo. Supongo que esto se debe a que los datos se están consumiendo antes de que el hilo Queue-flushing pueda ponerse al día. No he pensado en una forma de esperar el hilo sin bloqueo.
- 0,5% de las veces, hay un rastreo de la aplicación multiprocess.Queue
- 0,01% de las veces, los PID envolver alrededor, y por lo tanto la clasificación por PID da el orden equivocado.
En el peor de los casos (odds: uno de cada 70 millones), la salida se vería así:
START
DONE
302 wrote: '19\n'
32731 wrote: '0 1 2 3 4 5 6 7 8 '
32732 wrote: '0\n'
32734 wrote: '1\n'
32735 wrote: '2\n'
32736 wrote: '3\n'
32737 wrote: '4\n'
32738 wrote: '5\n'
32743 wrote: '6\n'
32744 wrote: '7\n'
32745 wrote: '8\n'
32749 wrote: '9\n'
32751 wrote: '10\n'
32752 wrote: '11\n'
32753 wrote: '12\n'
32754 wrote: '13\n'
32756 wrote: '14\n'
32757 wrote: '15\n'
32759 wrote: '16\n'
32760 wrote: '17\n'
32761 wrote: '18\n'
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/threading.py", line 484, in run
File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
En python2.7 la excepción es ligeramente diferente:
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
File "/usr/lib/python2.7/threading.py", line 505, in run
File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
<type 'exceptions.IOError'>: [Errno 32] Broken pipe
¿Cómo me deshago de estos casos extremos?
¿Cuál es la pregunta real que estás haciendo? ¿Por qué estás recibiendo esas excepciones? ¿Por qué están ocurriendo cada uno de los casos extremos? –
@Daniel: cómo deshacerse de esos tres problemas. Creo que me he dejado más claro al agregar una oración a la introducción. ¿Eso ayuda? – bukzor