2012-06-25 17 views
55

Tengo un único archivo de texto grande en el que quiero procesar cada línea (realizar algunas operaciones) y almacenarlas en una base de datos. Dado que un solo programa simple tarda demasiado, quiero que se realice a través de múltiples procesos o hilos. Cada hilo/proceso debe leer los datos DIFERENTES (diferentes líneas) de ese único archivo y hacer algunas operaciones en su pedazo de datos (líneas) y ponerlos en la base de datos para que al final, tenga todos los datos procesados ​​y mi base de datos se vierte con los datos que necesito.Procesamiento de un solo archivo desde procesos múltiples en python

Pero no soy capaz de descubrir cómo abordar esto.

+2

Buena pregunta. También tuve esta duda. Aunque fui con la opción de dividir el archivo en archivos más pequeños :) –

Respuesta

70

Lo que se busca es un modelo productor/consumidor

ejemplo enhebrado básico

Aquí es un ejemplo básico mediante el (en lugar de multiprocesamiento)

import threading 
import Queue 
import sys 

def do_work(in_queue, out_queue): 
    while True: 
     item = in_queue.get() 
     # process 
     result = item 
     out_queue.put(result) 
     in_queue.task_done() 

if __name__ == "__main__": 
    work = Queue.Queue() 
    results = Queue.Queue() 
    total = 20 

    # start for workers 
    for i in xrange(4): 
     t = threading.Thread(target=do_work, args=(work, results)) 
     t.daemon = True 
     t.start() 

    # produce data 
    for i in xrange(total): 
     work.put(i) 

    work.join() 

    # get the results 
    for i in xrange(total): 
     print results.get() 

    sys.exit() 

Usted wouldn No comparta el objeto de archivo con los hilos. Debería producir trabajo para ellos suministrando el queue con una línea de datos. Luego, cada hilo lo levantaría y lo procesaría, luego lo devolvería en la cola.

Hay algunas instalaciones más avanzadas integradas en el multiprocessing module para compartir datos, como listas y special kind of Queue. Hay compensaciones al uso de multiprocesamiento frente a subprocesos y depende de si su trabajo está vinculado a la CPU o a la IO.

básico multiprocessing.Pool ejemplo

Aquí está un ejemplo muy básico de una piscina multiprocesamiento

from multiprocessing import Pool 

def process_line(line): 
    return "FOO: %s" % line 

if __name__ == "__main__": 
    pool = Pool(4) 
    with open('file.txt') as source_file: 
     # chunk the work into batches of 4 lines at a time 
     results = pool.map(process_line, source_file, 4) 

    print results 

A Pool es un objeto de conveniencia que gestiona sus propios procesos. Como un archivo abierto puede iterar sobre sus líneas, puede pasarlo al mapa, que lo recorrerá y entregará líneas a la función de trabajador. Map bloquea y devuelve el resultado completo cuando está hecho. Tenga en cuenta que en un ejemplo demasiado simple, el map va a consumir su archivo de una sola vez antes de repartir el trabajo. Así que ten en cuenta si es más grande. Hay formas más avanzadas de diseñar una configuración de productor/consumidor. Manual

"pool" con límite y la línea de la separación ulterior

Este es un ejemplo de manual de la Pool.map, pero en lugar de consumir una iterables todo, se puede establecer un tamaño de la cola de modo que sólo se está alimentando es pieza por pieza tan rápido como puede procesar. También agregué los números de línea para que pueda rastrearlos y recurrir a ellos si lo desea más adelante.

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 
     line_no, line = item 

     # exit signal 
     if line == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = (line_no, line) 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    with open("source.txt") as f: 
     iters = itertools.chain(f, (None,)*num_workers) 
     for num_and_line in enumerate(iters): 
      work.put(num_and_line) 

    for p in pool: 
     p.join() 

    # get the results 
    # example: [(1, "foo"), (10, "bar"), (0, "start")] 
    print sorted(results) 
+0

sí, el archivo es más grande, aproximadamente 1 GB más o menos. No sé cuánto quiere decir más grande diciendo más grande, sin embargo, 1 GB es más grande para mí. – pranavk

+0

Eso está bien. Estoy seguro de que puede tomar estos ejemplos y extrapolar para sus necesidades. El enhebrado está bien tal como está. El multiprocesamiento solo necesita una cola similar para alimentar. – jdi

+1

Esto es bueno, pero ¿qué pasa si el procesamiento está vinculado a E/S? En ese caso, el paralelismo puede ralentizar las cosas en lugar de acelerarlo. Las búsquedas dentro de una sola pista de disco son mucho más rápidas que las búsquedas intertrack, y hacer I/O en paralelo tiende a introducir búsquedas intertrack en lo que de otro modo sería una carga de E/S secuencial. Para obtener algún beneficio de la E/S paralela, a veces ayuda bastante usar un espejo RAID. – user1277476

-4

rompe el archivo grande en varios archivos más pequeños y procesa cada uno de ellos en hilos separados.

+4

¿puedes mostrar algún código? – maq

+0

¡Esto no es lo que OP quiere! pero solo por una idea ... no está mal. – DRPK

5

Aquí hay un ejemplo muy estúpida que yo cociné:

import os.path 
import multiprocessing 

def newlinebefore(f,n): 
    f.seek(n) 
    c=f.read(1) 
    while c!='\n' and n > 0: 
     n-=1 
     f.seek(n) 
     c=f.read(1) 

    f.seek(n) 
    return n 

filename='gpdata.dat' #your filename goes here. 
fsize=os.path.getsize(filename) #size of file (in bytes) 

#break the file into 20 chunks for processing. 
nchunks=20 
initial_chunks=range(1,fsize,fsize/nchunks) 

#You could also do something like: 
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. 


with open(filename,'r') as f: 
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) 

end_byte=[i-1 for i in start_byte] [1:] + [None] 

def process_piece(filename,start,end): 
    with open(filename,'r') as f: 
     f.seek(start+1) 
     if(end is None): 
      text=f.read() 
     else: 
      nbytes=end-start+1 
      text=f.read(nbytes) 

    # process text here. createing some object to be returned 
    # You could wrap text into a StringIO object if you want to be able to 
    # read from it the way you would a file. 

    returnobj=text 
    return returnobj 

def wrapper(args): 
    return process_piece(*args) 

filename_repeated=[filename]*len(start_byte) 
args=zip(filename_repeated,start_byte,end_byte) 

pool=multiprocessing.Pool(4) 
result=pool.map(wrapper,args) 

#Now take your results and write them to the database. 
print "".join(result) #I just print it to make sure I get my file back ... 

La parte difícil aquí es asegurarse de que dividir el archivo en caracteres de nueva línea para que no se pierda ninguna línea (o solo lee líneas parciales).Luego, cada proceso lee que es parte del archivo y devuelve un objeto que se puede poner en la base de datos por el hilo principal. Por supuesto, puede que incluso necesite hacer esta parte en fragmentos para que no tenga que mantener toda la información en la memoria a la vez. (Esto se logra con bastante facilidad: simplemente divida la lista "args" en X fragmentos y llame al pool.map(wrapper,chunk) - Consulte here)

Cuestiones relacionadas