2011-05-18 11 views
5

Estoy escribiendo un raspador web en python, usando httplib2 y lxml (sí, sé que podría estar usando scrapy. Pasemos eso ...) El raspador tiene alrededor de 15000 páginas para analizar aproximadamente 400,000 artículos. Tengo el código para analizar los elementos para que se ejecuten de forma instantánea (casi), pero la parte que descarga la página del servidor sigue siendo extremadamente lenta. Me gustaría superar eso a través de la concurrencia. Sin embargo, no puedo confiar en CADA página que deba analizarse CADA vez. Lo he intentado con un solo ThreadPool (como multiprocesamiento.pool, pero hecho con hilos, lo que debería estar bien ya que se trata de un proceso de E/S enlazado), pero no se me ocurrió una forma elegante de obtener TODOS los hilos para detener cuando la fecha del último elemento de índice era mayor que el artículo que estábamos procesando. En este momento, estoy trabajando en un método que utiliza dos instancias de ThreadPool: una para descargar cada página y otra para analizar las páginas. Un ejemplo de código simplificado es:Python - múltiples grupos de hilos simultáneos

#! /usr/bin/env python2 

import httplib2 
from Queue import PriorityQueue 
from multiprocessing.pool import ThreadPool 
from lxml.html import fromstring 

pages = [x for x in range(1000)] 
page_queue = PriorityQueue(1000) 

url = "http://www.google.com" 

def get_page(page): 
    #Grabs google.com 
    h = httplib2.Http(".cache") 
    resp, content = h.request(url, "GET") 
    tree = fromstring(str(content), base_url=url) 
    page_queue.put((page, tree)) 
    print page_queue.qsize() 

def parse_page(): 
    page_num, page = page_queue.get() 
    print "Parsing page #" + str(page_num) 
    #do more stuff with the page here 
    page_queue.task_done() 

if __name__ == "__main__": 
    collect_pool = ThreadPool() 
    collect_pool.map_async(get_page, pages) 
    collect_pool.close() 

    parse_pool = ThreadPool() 
    parse_pool.apply_async(parse_page) 
    parse_pool.close() 


    parse_pool.join() 
    collect_pool.join() 
    page_queue.join() 

La ejecución de este código sin embargo, no hago lo que espero - que es de disparar dos threadpools: uno poblar una cola y otro tirando de ella para analizar. Comienza el grupo de recopilación y lo ejecuta, y luego comienza el parse_pool y lo ejecuta (supongo que no he permitido que el código se ejecute el tiempo suficiente para llegar a parse_pool; el punto es que collect_pool es todo lo que parece estar ejecutándose)) Estoy bastante seguro de haber arruinado algo con el orden de las llamadas para unirme(), pero no puedo por la vida de mí averiguar en qué orden se supone que deben estar. Mi pregunta es esencialmente esta: : ¿Estoy ladrando el árbol correcto aquí? y si es así, ¿qué demonios estoy haciendo mal? Si no lo soy, ¿cuáles serían sus sugerencias

+1

map_async - bloquea hasta que se procese todo el trabajo. –

+0

Eso aborda formalmente por qué no funciona, pero no responde necesariamente a toda mi pregunta, que se reduce a "¿Es esto una manera loca de hacer esto?". Si la respuesta es 'no', estoy cerca y solo necesito perfeccionar mis métodos para hacerlo. En caso afirmativo, me gustaría algunos consejos sobre cómo puedo lograr esto 'correctamente'. – bbenne10

Respuesta

6

En primer lugar, su diseño parece ser el correcto a un alto nivel. El uso de un threadpool para recopilar las páginas se justifica por la naturaleza sincrónica del módulo httlib2. (Con una biblioteca asíncrona, un hilo sería suficiente; tenga en cuenta que incluso con httplib2 y el pool, como máximo, un subproceso de colector se ejecuta en cualquier momento debido al GIL). El grupo de análisis se justifica porque el módulo lxml se escribió en C/C++ (y suponiendo que así se libera el Global Interpreter Lock durante el análisis sintáctico de la página, ¡esto debe verificarse en los documentos o código lxml!). Si esto último no fuera cierto, no habría ganancia de rendimiento al tener un grupo de análisis dedicado ya que solo un hilo podría adquirir el GIL. En este caso, sería mejor usar un grupo de procesos.

No estoy familiarizado con la implementación ThreadPool, pero supongo que es análoga a la clase Pool en el módulo de multiprocesamiento. Sobre esta base, el problema parece ser que usted crea solo un único elemento de trabajo para parse_pool y luego de que parse_page procesa la primera página, nunca intenta dequeue otras páginas desde allí. Los elementos de trabajo adicionales tampoco se envían a este grupo, por lo que el proceso se detiene y, después de la llamada parse_pool.close(), finalizan los hilos del grupo (vacío).

La solución es eliminar la page_queue. La función get_page() debe poner un elemento de trabajo en parse_pool llamando a apply_async() para cada página que recopila, en lugar de incluirlos en page_queue.

El hilo principal debe esperar hasta que collect_queue esté vacío (es decir, se devolvió la llamada a collect_pool.join()), entonces debe cerrar el parse_pool (ya que podemos estar seguros de que no se enviarán más trabajos para el analizador). Luego debe esperar a que parse_pool se vacíe al llamar a parse_pool.join() y luego salir.

Además, debe aumentar el número de subprocesos en connect_pool para procesar más solicitudes http al mismo tiempo. El número predeterminado de subprocesos en un grupo es la cantidad de CPU; actualmente no puedes emitir más que muchas solicitudes. Puede experimentar con valores de hasta miles o miles de miles; observe el consumo de la CPU del grupo; no debería acercarse a 1 CPU.

+0

Votaba esto si pudiera. Impresionante respuesta - Gracias. – bbenne10

+0

@ bbenne10: Es una muy buena respuesta. Tienes que aceptarlo. –