2011-11-23 8 views
5

Estoy tratando de procesar el contenido de un archivo tar utilizando multiprocessing.Pool. Puedo utilizar con éxito la implementación de ThreadPool dentro del módulo de multiprocesamiento, pero me gustaría poder utilizar procesos en lugar de hilos, ya que posiblemente sea más rápido y eliminar algunos cambios realizados para que Matplotlib maneje el entorno multiproceso. Estoy consiguiendo un error que sospecho que se relaciona con procesos no comparten el espacio de direcciones, pero no estoy seguro de cómo solucionarlo:¿Cómo puedo procesar un archivo tar con un grupo de multiprocesamiento de Python?

Traceback (most recent call last): 
    File "test_tarfile.py", line 32, in <module> 
    test_multiproc() 
    File "test_tarfile.py", line 24, in test_multiproc 
    pool.map(read_file, files) 
    File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get 
    raise self._value 
ValueError: I/O operation on closed file 

El programa en sí es más complicado, pero esto es un ejemplo de lo estoy haciendo que reproduce el error:

from multiprocessing.pool import ThreadPool, Pool 
import StringIO 
import tarfile 

def write_tar(): 
    tar = tarfile.open('test.tar', 'w') 
    contents = 'line1' 
    info = tarfile.TarInfo('file1.txt') 
    info.size = len(contents) 
    tar.addfile(info, StringIO.StringIO(contents)) 
    tar.close() 

def test_multithread(): 
    tar = tarfile.open('test.tar') 
    files = [tar.extractfile(member) for member in tar.getmembers()] 
    pool = ThreadPool(processes=1) 
    pool.map(read_file, files) 
    tar.close() 

def test_multiproc(): 
    tar = tarfile.open('test.tar') 
    files = [tar.extractfile(member) for member in tar.getmembers()] 
    pool = Pool(processes=1) 
    pool.map(read_file, files) 
    tar.close() 

def read_file(f): 
    print f.read() 

write_tar() 
test_multithread() 
test_multiproc() 

sospecho que el que algo va mal cuando el objeto TarInfo se pasa a otro proceso, pero el padre no es TarFile, pero no estoy seguro de cómo solucionarlo en el caso multiproceso. ¿Puedo hacer esto sin tener que extraer archivos del tarball y escribirlos en el disco?

Respuesta

5

usted no está pasando un objeto TarInfo en el otro proceso, estás pasando el resultado de tar.extractfile(member) en el otro proceso donde member es un objeto TarInfo. El método extractfile(...) devuelve un objeto similar a un archivo que tiene, entre otras cosas, un método read() que opera sobre el archivo tar original que abrió con tar = tarfile.open('test.tar').

Sin embargo, no puede usar un archivo abierto de un proceso en otro proceso, debe volver a abrir el archivo. He sustituido el test_multiproc() con esto:

def test_multiproc(): 
    tar = tarfile.open('test.tar') 
    files = [name for name in tar.getnames()] 
    pool = Pool(processes=1) 
    result = pool.map(read_file2, files) 
    tar.close() 

y ha añadido lo siguiente:

def read_file2(name): 
    t2 = tarfile.open('test.tar') 
    print t2.extractfile(name).read() 
    t2.close() 

y fue capaz de obtener su código de trabajo.

+0

Compatibilidad con Windows: 'if name == '__main__': test_multiproc()'. No hay fork en Windows, por lo que el módulo se importa en el nuevo proceso inicialmente con el nombre ''__parents_main___', y luego el nombre vuelve a ser '' __main __ ''. Por lo tanto, puede proteger declaraciones que no desea ejecutar en los procesos secundarios mediante el uso de un bloque 'if'. – eryksun

+0

Esto funciona, pero requiere que vuelva a abrir el archivo tar en cada proceso. ¿Hay alguna otra solución que permita el acceso de solo lectura a un descriptor de archivo entre procesos? –

+0

Terminé configurando una bandera para leer previamente los datos antes de que se desvíen los múltiples procesos. ¡Gracias! –

Cuestiones relacionadas