2011-06-29 14 views
8

Tengo el siguiente problema en python.Escribiendo en un archivo con multiprocesamiento

Necesito hacer algunos cálculos en paralelo cuyos resultados debo escribir secuencialmente en un archivo. Así que creé una función que recibe un mango multiprocessing.Queue y un archivo, haga el cálculo e imprimir el resultado en el archivo:

import multiprocessing 
from multiprocessing import Process, Queue 
from mySimulation import doCalculation 

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file 

def work(queue, fh): 
while True: 
    try: 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
    except: 
     break 


if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fh = open("foo", "w") 
    workQueue = Queue() 
    parList = # list of conditions for which I want to run doCalculation() 
    for x in parList: 
     workQueue.put(x) 
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)] 
    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 
    fh.close() 

Pero el archivo termina vacía después de que el script se ejecuta. Traté de cambiar la función worker() a:

def work(queue, filename): 
while True: 
    try: 
     fh = open(filename, "a") 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
     fh.close() 
    except: 
     break 

y paso el nombre del archivo como parámetro. Entonces funciona como lo pretendí. Cuando trato de hacer lo mismo de forma secuencial, sin multiprocesamiento, también funciona normalmente.

¿Por qué no funcionó en la primera versión? No puedo ver el problema

Además, ¿puedo garantizar que dos procesos no intentarán escribir el archivo simultáneamente?


EDIT:

Gracias. Lo entiendo ahora. Esta es la versión de trabajo:

import multiprocessing 
from multiprocessing import Process, Queue 
from time import sleep 
from random import uniform 

def doCalculation(par): 
    t = uniform(0,2) 
    sleep(t) 
    return par * par # just to simulate some calculation 

def feed(queue, parlist): 
    for par in parlist: 
      queue.put(par) 

def calc(queueIn, queueOut): 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      print "dealing with ", par, "" 
      res = doCalculation(par) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 

if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fname = "foo" 
    workerQueue = Queue() 
    writerQueue = Queue() 
    parlist = [1,2,3,4,5,6,7,8,9,10] 
    feedProc = Process(target = feed , args = (workerQueue, parlist)) 
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] 
    writProc = Process(target = write, args = (writerQueue, fname)) 


    feedProc.start() 
    for p in calcProc: 
     p.start() 
    writProc.start() 

    feedProc.join() 
    for p in calcProc: 
     p.join() 
    writProc.join() 
+2

Por favor, concéntrese. Un conjunto de código ** solo **. Por favor, elimine el código obsoleto o irrelevante. Por favor, evita usar "Editar". Simplemente haga que la pregunta sea perfectamente clara, completa y consistente, por favor. –

Respuesta

16

debería querer usar dos colas y los tres tipos separados de procesamiento.

  1. Ponga cosas en la cola n. ° 1.

  2. Obtén cosas de la cola n. ° 1 y realiza cálculos, colocando cosas en la cola n. ° 2. Puede tener muchos de estos, ya que obtienen de una cola y los colocan en otra cola de forma segura.

  3. Obtén cosas de la Cola 2 y escríbelas en un archivo. Debe tener exactamente 1 de estos y no más. "Posee" el archivo, garantiza el acceso atómico y garantiza que el archivo está escrito de forma limpia y consistente.

+1

+1 para colas de trabajadores y consumidores. Recuerde establecer un tamaño máximo en la cola o sus trabajadores pueden comer su memoria y matar de hambre al escritor. – Bittrance

+0

@ S.Lott @Bittrance, eche un vistazo a mi edición. –

+1

Oh, no importa de las múltiples ejecuciones ... Soy tan estúpido como para no darme cuenta de que lancé el feedPro y el writProc varias veces. ¬¬ Corregí el código. Pero todavía tengo un archivo vacío. –

4

Si alguien está buscando una manera simple de hacer lo mismo, esto puede ayudarlo. No creo que haya desventajas al hacerlo de esta manera. Si hay, házmelo saber.

import multiprocessing 
import re 

def mp_worker(item): 
    # Do something 
    return item, count 

def mp_handler(): 
    cpus = multiprocessing.cpu_count() 
    p = multiprocessing.Pool(cpus) 
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step. 
    with open('ExampleFile.txt') as f: 
     listX = [line for line in (l.strip() for l in f) if line] 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, listX): 
      # (item, count) tuples from worker 
      f.write('%s: %d\n' % result) 

if __name__=='__main__': 
    mp_handler() 

Fuente: Python: Writing to a single file with queue while using multiprocessing Pool

0

Hay un error en el código de escritura del trabajador, si el bloque es falso, el trabajador no recibirá los datos. Debe ser el siguiente:

par, res = queue.get(block = True) 

se puede comprobar mediante la adición de la línea

print "QSize",queueOut.qsize() 

después de la queueOut.put((par,res))

Con bloque = False usted estaría consiguiendo cada vez mayor longitud de la cola hasta que se se llena, a diferencia del bloque = True donde siempre obtiene "1".

Cuestiones relacionadas