Lo que se busca es un modelo productor/consumidor
ejemplo enhebrado básico
Aquí es un ejemplo básico mediante el (en lugar de multiprocesamiento)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
Usted wouldn No comparta el objeto de archivo con los hilos. Debería producir trabajo para ellos suministrando el queue con una línea de datos. Luego, cada hilo lo levantaría y lo procesaría, luego lo devolvería en la cola.
Hay algunas instalaciones más avanzadas integradas en el multiprocessing module para compartir datos, como listas y special kind of Queue. Hay compensaciones al uso de multiprocesamiento frente a subprocesos y depende de si su trabajo está vinculado a la CPU o a la IO.
básico multiprocessing.Pool ejemplo
Aquí está un ejemplo muy básico de una piscina multiprocesamiento
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
A Pool es un objeto de conveniencia que gestiona sus propios procesos. Como un archivo abierto puede iterar sobre sus líneas, puede pasarlo al mapa, que lo recorrerá y entregará líneas a la función de trabajador. Map bloquea y devuelve el resultado completo cuando está hecho. Tenga en cuenta que en un ejemplo demasiado simple, el map
va a consumir su archivo de una sola vez antes de repartir el trabajo. Así que ten en cuenta si es más grande. Hay formas más avanzadas de diseñar una configuración de productor/consumidor. Manual
"pool" con límite y la línea de la separación ulterior
Este es un ejemplo de manual de la Pool.map, pero en lugar de consumir una iterables todo, se puede establecer un tamaño de la cola de modo que sólo se está alimentando es pieza por pieza tan rápido como puede procesar. También agregué los números de línea para que pueda rastrearlos y recurrir a ellos si lo desea más adelante.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
Buena pregunta. También tuve esta duda. Aunque fui con la opción de dividir el archivo en archivos más pequeños :) –