2010-01-18 14 views
6

Tengo un gran archivo de datos XML (> 160M) para procesar, y parece que el análisis SAX/expat/pulldom es el camino a seguir. Me gustaría tener un hilo que revise los nodos y empuje los nodos para que se procesen en una cola, y luego otros hilos de trabajo extraen el siguiente nodo disponible de la cola y lo procesan.¿Cómo puedo procesar xml asincrónicamente en python?

tengo el siguiente (debe tener cerraduras, sé - que, más adelante)

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 

def start_handler(name, attrs): 
    q.append(name) 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    print(q) 
    time.sleep(1) 

El problema es que el cuerpo del bloque while se llama sólo una vez, y entonces no puedo incluso ctrl-C lo interrumpe. En archivos más pequeños, el resultado es el esperado, pero eso parece indicar que solo se llama al controlador cuando el documento está completamente analizado, lo que parece frustrar el propósito de un analizador SAX.

Estoy seguro de que es mi propia ignorancia, pero no veo dónde estoy cometiendo el error.

PS: Yo también intentó cambiar start_handler así:

def start_handler(name, attrs): 
    def app(): 
     q.append(name) 
    u = threading.Thread(group=None, target=app) 
    u.start() 

Sin amor, sin embargo.

Respuesta

7

ParseFile, como habrás notado, simplemente "engullo" todo - no es bueno para el incremental ¡análisis que quieres hacer! Por lo tanto, simplemente alimentar el archivo al analizador un poco a la vez, asegurándose de producir condicionalmente el control a otros hilos a medida que avanza - por ejemplo:

while True: 
    data = f.read(BUFSIZE) 
    if not data: 
    p.Parse('', True) 
    break 
    p.Parse(data, False) 
    time.sleep(0.0) 

la llamada time.sleep(0.0) es la forma de Python decir "rendimiento a otra hilos si alguno está listo y esperando "; el método Parse está documentado here.

El segundo punto es, ¡olvídese de los bloqueos para este uso! - Use Queue.Queue en su lugar, es intrínsecamente seguro para los hilos y casi invariablemente la mejor y más simple forma de coordinar múltiples hilos en Python. Simplemente haga una instancia Queueq, q.put(name) en ella, y ha trabajado el bloque de hilos en q.get() esperando para obtener algo más de trabajo que hacer - ¡es TAN simple!

(Hay varias estrategias auxiliares que puede utilizar para coordinar la terminación de subprocesos de trabajo cuando no hay más trabajo para ellos, pero los requisitos especiales más simples y ausentes, es simplemente convertirlos en subprocesos de daemon, por lo que todos terminar cuando el hilo principal lo hace - ver the docs).

+0

Has votado por las sugerencias de cola, pero ¿estás seguro de que ParseFile traga todo de una vez? Devuelve la llamada a los manejadores de Python para manejar las etiquetas a medida que avanza, ese es el propósito del análisis SAX ... ¿o estás diciendo que eso no es suficiente para activar un cambio de hilo en Python? –

+1

Si desea SAX, puede usar xml.sax, consulte http://docs.python.org/library/xml.sax.html?highlight=sax#module-xml.sax; el OP no usa SAX, sino más bien xml.parsers.expat, una interfaz de baja abstracción que ** no ** impone una estrategia incremental (lo'supports_ it, pero no _impose_ it, dejándolo hasta el nivel de código de Python escoger y elegir). –

+0

La elección de expatriado fue algo arbitraria, no pude encontrar una buena explicación de la diferencia entre expatriados y saxofones. El módulo de saxo funciona igual de bien, incluso mejor, ya que parece ser tan asíncrono como lo necesitaba. Terminé adoptando el método de "alimentarlo por partes" de todos modos, ya que me da la oportunidad de esterilizar las cadenas que alimento antes de que el analizador llegue a ellas. Muy útil respuesta, gracias. – decitrig

1

Lo único que veo que está mal es que está accediendo a q simultáneamente desde diferentes subprocesos, sin bloqueo mientras escribe de hecho. Eso es un problema, y ​​probablemente estés teniendo problemas con el intérprete de Python que te está bloqueando. :)

Trate de bloqueo, en realidad no es muy difícil:

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 
q_lock = threading.Lock() <--- 

def start_handler(name, attrs): 
    q_lock.acquire() <--- 
    q.append(name) 
    q_lock.release() <--- 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    q_lock.acquire() <--- 
    print(q) 
    q_lock.release() <--- 
    time.sleep(1) 

Ves, fue muy simple, que acaba de crear una variable de bloqueo para proteger nuestro objeto, y adquirir de que se bloquee cada vez antes de usar el objeto y la liberación cada vez que terminamos nuestra tarea en el objeto. De esta forma garantizamos que q.append(name) nunca se superpondrá con print(q).


(Con las nuevas versiones de Python también hay un "con ...." sintaxis que le ayuda a no liberar las cerraduras o cerrar archivos u otras limpiezas se olvida con frecuencia.)

7

estoy no muy seguro sobre este problema. Supongo que la llamada a ParseFile está bloqueando y solo se está ejecutando el hilo de análisis debido a GIL. Una forma de evitar esto sería usar multiprocessing en su lugar. Está diseñado para trabajar con colas, de todos modos.

usted hace una Process y se puede pasar un Queue:

import sys, time 
import xml.parsers.expat 
import multiprocessing 
import Queue 

def do_expat(q): 
    p = xml.parsers.expat.ParserCreate() 

    def start_handler(name, attrs): 
     q.put(name) 

    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 

if __name__ == '__main__': 
    q = multiprocessing.Queue() 
    process = multiprocessing.Process(target=do_expat, args=(q,)) 
    process.start() 

    elements = [] 
    while True: 
     while True: 
      try: 
       elements.append(q.get_nowait()) 
      except Queue.Empty: 
       break 

     print elements 
     time.sleep(1) 

He incluido una lista de elementos, sólo para replicar su guión original. Su solución final probablemente usará get_nowait y Pool o algo similar.

+1

Sí, este es un buen camino a seguir, como dijiste, querrías usar colas de todos modos. –

+0

Probé ese código; evita el bloqueo, pero ParseFile todavía no parece generar ningún resultado hasta que se lee toda la entrada. – decitrig

0

No sé mucho sobre la implementación, pero si el análisis es el código C que se ejecuta hasta que se complete, no se ejecutarán otros subprocesos de Python. Si el analizador vuelve a llamar al código de Python, el GIL puede liberarse para que se ejecuten otros subprocesos, pero no estoy seguro de eso. Es posible que desee verificar esos detalles.

Cuestiones relacionadas