Tengo el siguiente problema en python.Escribiendo en un archivo con multiprocesamiento
Necesito hacer algunos cálculos en paralelo cuyos resultados debo escribir secuencialmente en un archivo. Así que creé una función que recibe un mango multiprocessing.Queue
y un archivo, haga el cálculo e imprimir el resultado en el archivo:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
Pero el archivo termina vacía después de que el script se ejecuta. Traté de cambiar la función worker() a:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
y paso el nombre del archivo como parámetro. Entonces funciona como lo pretendí. Cuando trato de hacer lo mismo de forma secuencial, sin multiprocesamiento, también funciona normalmente.
¿Por qué no funcionó en la primera versión? No puedo ver el problema
Además, ¿puedo garantizar que dos procesos no intentarán escribir el archivo simultáneamente?
EDIT:
Gracias. Lo entiendo ahora. Esta es la versión de trabajo:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join()
for p in calcProc:
p.join()
writProc.join()
Por favor, concéntrese. Un conjunto de código ** solo **. Por favor, elimine el código obsoleto o irrelevante. Por favor, evita usar "Editar". Simplemente haga que la pregunta sea perfectamente clara, completa y consistente, por favor. –