2012-01-03 22 views
15

Estoy tratando de paralelizar una aplicación usando multiprocesamiento que toma en un archivo csv muy grande (64MB a 500MB), hace algo de trabajo línea por línea, y luego genera un pequeño archivo de tamaño fijo .¿Chunking de datos de un archivo grande para multiprocesamiento?

Actualmente hago un list(file_obj), que, desgraciadamente, se carga por completo en la memoria (creo) y luego romper esa lista hacia arriba en n partes, siendo n el número de procesos que desea ejecutar. Luego hago un pool.map() en las listas rotas .

Esto parece tener un tiempo de ejecución muy, muy malo en comparación con una sola metodología enhebrada, just-open-the-file-and-iterate-over-it. ¿Puede alguien sugerir una mejor solución?

Además, necesito procesar las filas del archivo en grupos que conservan el valor de una cierta columna. Estos grupos de filas se pueden dividir, , pero ningún grupo debe contener más de un valor para esta columna.

Respuesta

14

list(file_obj) pueden requerir mucha memoria cuando fileobj es grande. Podemos reducir ese requisito de memoria usando itertools para extraer trozos de líneas cuando los necesitemos.

En particular, podemos utilizar

reader = csv.reader(f) 
chunks = itertools.groupby(reader, keyfunc) 

dividir el archivo en partes procesables, y

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] 
result = pool.map(worker, groups) 

tener la obra de la piscina multiprocesamiento en num_chunks trozos a la vez.

Al hacerlo, necesitamos aproximadamente la cantidad de memoria suficiente para contener unos pocos (num_chunks) trozos en la memoria, en lugar de todo el archivo.


import multiprocessing as mp 
import itertools 
import time 
import csv 

def worker(chunk): 
    # `chunk` will be a list of CSV rows all with the same name column 
    # replace this with your real computation 
    # print(chunk) 
    return len(chunk) 

def keyfunc(row): 
    # `row` is one row of the CSV file. 
    # replace this with the name column. 
    return row[0] 

def main(): 
    pool = mp.Pool() 
    largefile = 'test.dat' 
    num_chunks = 10 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 
    pool.close() 
    pool.join() 
    print(results) 

if __name__ == '__main__': 
    main() 
+0

mentí cuando dije que las líneas no están relacionados entre sí - en el csv, hay una columna que hay que dividir por (una columna nombre, y todas las filas con ese nombre no puede ser dividido). Sin embargo, creo que puedo adaptar esto al grupo en este criterio. ¡Gracias! No sabía nada sobre itertools, y ahora poco más que nada. – user1040625

+0

Hubo un error en mi código original. Todas las llamadas a 'pool.apply_async' no son de bloqueo, por lo que todo el archivo se puso en cola al mismo tiempo. Esto hubiera resultado en ningún ahorro de memoria. Así que modifiqué un poco el bucle para poner 'num_chunks' a la vez. La llamada a 'pool.map' está bloqueando, lo que evitará que todo el archivo se ponga en cola a la vez. – unutbu

+0

@HappyLeapSegundo usuario está intentando implementar sus métodos aquí http://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiproccessing y está teniendo problemas. Quizás puedas ayudar? – m0meni

1

Lo mantendría simple. Haga que un solo programa abra el archivo y léalo línea por línea. Puede elegir en cuántos archivos dividirlo, abrir tantos archivos de salida y cada línea escribir en el siguiente archivo. Esto dividirá el archivo en n partes iguales. A continuación, puede ejecutar un programa de Python contra cada uno de los archivos en paralelo.

Cuestiones relacionadas