2010-08-27 24 views
6

Esto es lo que estoy tratando de lograr -pitón -> módulo de multiprocesamiento

  1. Tengo cerca de un millón de archivos que necesito para analizar & anexar el contenido analizada en un solo archivo.
  2. Dado que un único proceso lleva años, esta opción está desactivada.
  3. No usar subprocesos en Python, ya que básicamente se trata de ejecutar un único proceso (debido a GIL).
  4. Por lo tanto, utilizando el módulo de multiprocesamiento. es decir, engendrando 4 subprocesos para utilizar todo ese poder de núcleo sin procesar :)

Hasta ahora todo bien, ahora necesito un objeto compartido al que tengan acceso todos los subprocesos. Estoy usando Colas del módulo de multiprocesamiento. Además, todos los subprocesos necesitan escribir su salida en un solo archivo. Un lugar potencial para usar Locks, supongo. Con esta configuración cuando corro, no obtengo ningún error (por lo que el proceso principal parece correcto), simplemente se detiene. Cuando presiono ctrl-C veo un traceback (uno para cada subproceso). Tampoco se escribe ninguna salida en el archivo de salida. Aquí está el código (tenga en cuenta que todo funciona bien sin multi-procesos) -

import os 
import glob 
from multiprocessing import Process, Queue, Pool 

data_file = open('out.txt', 'w+') 

def worker(task_queue): 
    for file in iter(task_queue.get, 'STOP'): 
     data = mine_imdb_page(os.path.join(DATA_DIR, file)) 
     if data: 
      data_file.write(repr(data)+'\n') 
    return 

def main(): 
    task_queue = Queue() 
    for file in glob.glob('*.csv'): 
     task_queue.put(file) 
    task_queue.put('STOP') # so that worker processes know when to stop 

    # this is the block of code that needs correction. 
    if multi_process: 
     # One way to spawn 4 processes 
     # pool = Pool(processes=4) #Start worker processes 
     # res = pool.apply_async(worker, [task_queue, data_file]) 

     # But I chose to do it like this for now. 
     for i in range(4): 
      proc = Process(target=worker, args=[task_queue]) 
      proc.start() 
    else: # single process mode is working fine! 
     worker(task_queue) 
    data_file.close() 
    return 

¿qué estoy haciendo mal? También intenté pasar el archivo_objeto abierto a cada uno de los procesos en el momento del desove. Pero sin efecto. por ejemplo, Process(target=worker, args=[task_queue, data_file]). Pero esto no cambió nada. Siento que los subprocesos no pueden escribir en el archivo por alguna razón. O bien la instancia del file_object no se está replicando (en el momento del engendro) o alguna otra peculiaridad ... ¿Alguien tuvo una idea?

EXTRA: también ¿Hay alguna manera de mantener un mysql_connection persistente abierta & pase a través a los sub_processes? Así que abro una conexión mysql en mi proceso principal & la conexión abierta debe ser accesible para todos mis subprocesos. Básicamente, esto es el equivalente de una memoria compartida en python. Alguna idea aqui?

+0

Si no escribe en el archivo pero imprime, ¿funciona entonces? (En Linux haría python script.py> out.dat para evitar inundaciones de pantalla). – extraneon

+1

Y creo que proc.start no es bloqueante, por lo que probablemente debería esperar en algún lugar para dar al proceso la oportunidad de hacer un trabajo antes de hacer el archivo de datos.close() – extraneon

+0

data_file.close() se hace al final. ¿Debería afectar aquí? También la impresión funciona bien. Veo el resultado en la pantalla cuando uso print ... Pero quiero usar el archivo. ¡ayuda! También, ¿hay alguna forma de mantener abierta una mysql_connection persistente y pasarla a los sub_processes? –

Respuesta

4

Aunque la discusión con Eric fue fructífera, más adelante encontré una mejor manera de hacerlo. Dentro del módulo de multiprocesamiento hay un método llamado 'Pool' que es perfecto para mis necesidades.

Se ha optimizado a la cantidad de núcleos que tiene mi sistema. es decir, solo se generan tantos procesos como el no. de núcleos. Por supuesto, esto es personalizable. Así que aquí está el código. Podría ayudar a alguien más tarde-

from multiprocessing import Pool 

def main(): 
    po = Pool() 
    for file in glob.glob('*.csv'): 
     filepath = os.path.join(DATA_DIR, file) 
     po.apply_async(mine_page, (filepath,), callback=save_data) 
    po.close() 
    po.join() 
    file_ptr.close() 

def mine_page(filepath): 
    #do whatever it is that you want to do in a separate process. 
    return data 

def save_data(data): 
    #data is a object. Store it in a file, mysql or... 
    return 

todavía va a través de este enorme módulo.No estoy seguro de si save_data() se ejecuta mediante el proceso primario o si esta función es utilizada por procesos secundarios generados. Si es el niño el que hace el ahorro, podría generar problemas de simultaneidad en algunas situaciones. Si alguien tiene más experiencia en el uso de este módulo, apreciará más conocimiento aquí ...

3

la documentación para multiprocesamiento indican varios métodos para compartir el estado entre procesos:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

estoy seguro que cada proceso consigue un intérprete fresco y entonces el objetivo (función) y args se cargan en ella. En ese caso, el espacio de nombre global de su secuencia de comandos se habría vinculado a su función de trabajo, por lo que el archivo_de_datos estaría allí. Sin embargo, no estoy seguro de qué sucede con el descriptor de archivo a medida que se copia. ¿Has intentado pasar el objeto de archivo como uno de los argumentos?

Una alternativa es pasar otra cola que contendrá los resultados de los trabajadores. Los trabajadores put los resultados y el código principal get s los resultados y los escribe en el archivo.

+0

¡Sí! Yo podría hacer eso. Podría tener otra cola, que sería algo así como una salida de cola en la que se escriben los procesos. Dado que el proceso principal tiene acceso a esto, podría seguir leyendo esta cola y escribir en el archivo. ¡Esto podría funcionar! También intenté pasar el objeto de archivo como uno de los argumentos. No parece funcionar. Los hilos no escriben en el archivo. También Eric, ¿alguna idea de cómo pasar una persistente conexión mysql a subprocesos? –

+0

@Srikar, espero que ayude. En cuanto a las conexiones de mysql, no estoy seguro de eso. Yo diría que está mejor con una conexión separada para cada proceso. Incluso si pudieras una conexión, no estoy seguro de cómo es "seguro para subprocesos". Si realmente tuvieras que compartir solo uno, probablemente tendrías que hacer cosas raras. Por otra parte, también puede proxy el mecanismo de consulta/respuesta de la conexión en una cola. Luego, el proceso principal (o un proceso de manejo de mysql separado) obtiene las consultas de la cola, las ejecuta y devuelve los resultados ... o algo así. –

Cuestiones relacionadas