2012-09-18 13 views
16

Tengo un script que incluye abrir un archivo de una lista y luego hacer algo con el texto dentro de ese archivo. Estoy usando el multiproceso de Python y Pool para tratar de paralelizar esta operación. Una abstracción del guión es el siguiente:multiproceso de python apply_async solo usa un proceso

import os 
from multiprocessing import Pool 

results = [] 
def testFunc(files): 
    for file in files: 
     print "Working in Process #%d" % (os.getpid()) 
     #This is just an illustration of some logic. This is not what I'm actually doing. 
     for line in file: 
      if 'dog' in line: 
       results.append(line) 

if __name__=="__main__": 
    p = Pool(processes=2) 
    files = ['/path/to/file1.txt', '/path/to/file2.txt'] 
    results = p.apply_async(testFunc, args = (files,)) 
    results2 = results.get() 

Cuando ejecuto esto la impresión de la identificación del proceso es el mismo para cada iteración. Básicamente, lo que trato de hacer es tomar cada elemento de la lista de entrada y bifurcarlo en un proceso separado, pero parece que un proceso está haciendo todo el trabajo.

Respuesta

28
  • apply_async realiza una tarea en la piscina. Debería llamar al apply_async muchas veces para ejercitar más procesadores.
  • No permita que ambos procesos intenten escribir en la misma lista, results. Como los trabajadores del grupo son procesos separados, los dos no escribirán en la misma lista. Una forma de evitar esto es utilizar una cola de salida. Puede configurarlo usted mismo, o usar la devolución de llamada de apply_async para configurar la cola por usted. apply_async llamará a la devolución de llamada una vez que la función se complete.
  • Puede usar map_async en lugar de apply_async, pero luego obtendrá y obtendrá una lista de listas, que luego deberá aplanar.

Por lo tanto, quizás intenta algo como:

import os 
import multiprocessing as mp 

results = [] 

def testFunc(file): 
    result = [] 
    print "Working in Process #%d" % (os.getpid()) 
    # This is just an illustration of some logic. This is not what I'm 
    # actually doing. 
    with open(file, 'r') as f: 
     for line in f: 
      if 'dog' in line: 
       result.append(line) 
    return result 


def collect_results(result): 
    results.extend(result) 

if __name__ == "__main__": 
    p = mp.Pool(processes=2) 
    files = ['/path/to/file1.txt', '/path/to/file2.txt'] 
    for f in files: 
     p.apply_async(testFunc, args=(f,), callback=collect_results) 
    p.close() 
    p.join() 
    print(results) 
7

Tal vez en este caso se debe utilizar map_async:

import os 
from multiprocessing import Pool 

results = [] 
def testFunc(file): 
    message = ("Working in Process #%d" % (os.getpid())) 
    #This is just an illustration of some logic. This is not what I'm actually doing. 
    for line in file: 
     if 'dog' in line: 
      results.append(line) 
    return message 

if __name__=="__main__": 
    print("saddsf") 
    p = Pool(processes=2) 
    files = ['/path/to/file1.txt', '/path/to/file2.txt'] 
    results = p.map_async(testFunc, files) 
    print(results.get()) 
+1

O quizás simplemente 'map' si vas a' resultados .get() 'de inmediato. – mgilson

+0

Agradezco la respuesta, pero trato de seguir con apply_async por varias razones. – user1074057

Cuestiones relacionadas