2012-05-21 17 views
6

En mi proyecto, utilizo la clase multiprocessing para ejecutar tareas de forma paralela. En su lugar, quiero usar threading, ya que tiene un mejor rendimiento (mis tareas están vinculadas a TCP/IP, no a CPU ni a E/S).Pasar de multiprocesamiento a subprocesamiento

multiprocessing tiene funciones maravillosas, como Pool.imap_unordered y Pool.map_async, que no existen en la clase threading.

¿Cuál es la forma correcta de convertir mi código para usar threading en su lugar? La documentación presenta la clase multiprocessing.dummy, que es un contenedor para la clase threading. Sin embargo, eso plantea un montón de errores (al menos en Python 2.7.3):

pool = multiprocessing.Pool(processes) 
    File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 150, in Pool 
    return ThreadPool(processes, initializer, initargs) 
    File "C:\python27\lib\multiprocessing\pool.py", line 685, in __init__ 
    Pool.__init__(self, processes, initializer, initargs) 
    File "C:\python27\lib\multiprocessing\pool.py", line 136, in __init__ 
    self._repopulate_pool() 
    File "C:\python27\lib\multiprocessing\pool.py", line 199, in _repopulate_pool 
    w.start() 
    File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 73, in start 
    self._parent._children[self] = None 
AttributeError: '_DummyThread' object has no attribute '_children' 

Editar: Lo que realmente ocurre es que tengo una interfaz gráfica de usuario que se ejecuta un hilo diferente (para evitar que la interfaz gráfica de usuario de gettint pegado) . Ese hilo ejecuta la función de búsqueda específica que tiene el ThreadPool que falla.

Editar 2: La corrección de error was fixed y se incluirá en futuras versiones. ¡Excelente para ver un crasher reparado!

import urllib2, htmllib, formatter 
import multiprocessing.dummy as multiprocessing 
import xml.dom.minidom 
import os 
import string, random 
from urlparse import parse_qs, urlparse 

from useful_util import retry 
import config 
from logger import log 

class LinksExtractor(htmllib.HTMLParser): 
    def __init__(self, formatter): 
     htmllib.HTMLParser.__init__(self, formatter) 
     self.links = [] 
     self.ignoredSites = config.WebParser_ignoredSites 

    def start_a(self, attrs): 
     for attr in attrs: 
      if attr[0] == "href" and attr[1].endswith(".mp3"): 
       if not filter(lambda x: (x in attr[1]), self.ignoredSites): 
        self.links.append(attr[1]) 

    def get_links(self): 
     return self.links 


def GetLinks(url, returnMetaUrlObj=False): 
    ''' 
    Function gather links from a url. 
    @param url: Url Address. 
    @param returnMetaUrlObj: If true, returns a MetaUrl Object list. 
          Else, returns a string list. Default is False. 

    @return links: Look up. 
    ''' 
    htmlparser = LinksExtractor(formatter.NullFormatter()) 

    try: 
     data = urllib2.urlopen(url) 
    except (urllib2.HTTPError, urllib2.URLError) as e: 
     log.error(e) 
     return [] 
    htmlparser.feed(data.read()) 
    htmlparser.close() 

    links = list(set(htmlparser.get_links())) 

    if returnMetaUrlObj: 
     links = map(MetaUrl, links) 

    return links 

def isAscii(s): 
    "Function checks is the string is ascii." 
    try: 
     s.decode('ascii') 
    except (UnicodeEncodeError, UnicodeDecodeError): 
     return False 
    return True 

@retry(Exception, logger=log) 
def parse(song, source): 
    ''' 
    Function parses the source search page and returns the .mp3 links in it. 
    @param song: Search string. 
    @param source: Search website source. Value can be dilandau, mp3skull, youtube, seekasong. 

    @return links: .mp3 url links. 
    ''' 
    source = source.lower() 
    if source == "dilandau": 
     return parse_dilandau(song) 
    elif source == "mp3skull": 
     return parse_Mp3skull(song) 
    elif source == "SeekASong": 
     return parse_SeekASong(song) 
    elif source == "youtube": 
     return parse_Youtube(song) 

    log.error('no source "%s". (from parse function in WebParser)') 
    return [] 

def parse_dilandau(song, pages=1): 
    "Function connects to Dilandau.eu and returns the .mp3 links in it" 
    if not isAscii(song): # Dilandau doesn't like unicode. 
     log.warning("Song is not ASCII. Skipping on dilandau") 
     return [] 

    links = [] 
    song = urllib2.quote(song.encode("utf8")) 

    for i in range(pages): 
     url = 'http://en.dilandau.eu/download_music/%s-%d.html' % (song.replace('-','').replace(' ','-').replace('--','-').lower(),i+1) 
     log.debug("[Dilandau] Parsing %s... " % url) 
     links.extend(GetLinks(url, returnMetaUrlObj=True)) 
    log.debug("[Dilandau] found %d links" % len(links)) 

    for metaUrl in links: 
     metaUrl.source = "Dilandau" 

    return links 

def parse_Mp3skull(song, pages=1): 
    "Function connects to mp3skull.com and returns the .mp3 links in it" 
    links = [] 
    song = urllib2.quote(song.encode("utf8")) 

    for i in range(pages): 
     # http://mp3skull.com/mp3/how_i_met_your_mother.html 
     url = 'http://mp3skull.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) 
     log.debug("[Mp3skull] Parsing %s... " % url) 
     links.extend(GetLinks(url, returnMetaUrlObj=True)) 
    log.debug("[Mp3skull] found %d links" % len(links)) 

    for metaUrl in links: 
     metaUrl.source = "Mp3skull" 

    return links 

def parse_SeekASong(song): 
    "Function connects to seekasong.com and returns the .mp3 links in it" 
    song = urllib2.quote(song.encode("utf8")) 

    url = 'http://www.seekasong.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) 
    log.debug("[SeekASong] Parsing %s... " % url) 
    links = GetLinks(url, returnMetaUrlObj=True) 
    for metaUrl in links: 
     metaUrl.source = "SeekASong" 
    log.debug("[SeekASong] found %d links" % len(links)) 

    return links 

def parse_Youtube(song, amount=10): 
    ''' 
    Function searches a song in youtube.com and returns the clips in it using Youtube API. 
    @param song: The search string. 
    @param amount: Amount of clips to obtain. 

    @return links: List of links. 
    ''' 
    "Function connects to youtube.com and returns the .mp3 links in it" 
    song = urllib2.quote(song.encode("utf8")) 
    url = r"http://gdata.youtube.com/feeds/api/videos?q=%s&max-results=%d&v=2" % (song.replace(' ', '+'), amount) 
    urlObj = urllib2.urlopen(url, timeout=4) 
    data = urlObj.read() 
    videos = xml.dom.minidom.parseString(data).getElementsByTagName('feed')[0].getElementsByTagName('entry') 

    links = [] 
    for video in videos: 
     youtube_watchurl = video.getElementsByTagName('link')[0].attributes.item(0).value 
     links.append(get_youtube_hightest_quality_link(youtube_watchurl)) 

    return links 

def get_youtube_hightest_quality_link(youtube_watchurl, priority=config.youtube_quality_priority): 
    ''' 
    Function returns the highest quality link for a specific youtube clip. 
    @param youtube_watchurl: The Youtube Watch Url. 
    @param priority: A list represents the qualities priority. 

    @return MetaUrlObj: MetaUrl Object. 
    ''' 
    video_id = parse_qs(urlparse(youtube_watchurl).query)['v'][0] 
    youtube_embedded_watchurl = "http://www.youtube.com/embed/%s?autoplay=1" % video_id 

    d = get_youtube_dl_links(video_id) 
    for x in priority: 
     if x in d.keys(): 
      return MetaUrl(d[x][0], 'youtube', d['VideoName'], x, youtube_embedded_watchurl) 
    log.error("No Youtube link has been found in get_youtube_hightest_quality_link.") 
    return "" 

@retry(Exception, logger=log) 
def get_youtube_dl_links(video_id): 
    ''' 
    Function gets the download links for a youtube clip. 
    This function parses the get_video_info format of youtube. 

    @param video_id: Youtube Video ID. 
    @return d: A dictonary of qualities as keys and urls as values. 
    ''' 
    d = {} 

    url = r"http://www.youtube.com/get_video_info?video_id=%s&el=vevo" % video_id 

    urlObj = urllib2.urlopen(url, timeout=12) 
    data = urlObj.read() 
    data = urllib2.unquote(urllib2.unquote(urllib2.unquote(data))) 
    data = data.replace(',url', '\nurl') 
    data = data.split('\n') 

    for line in data: 
     if 'timedtext' in line or 'status=fail' in line or '<AdBreaks>' in line: 
      continue 

     try: 
      url = line.split('&quality=')[0].split('url=')[1] 
      quality = line.split('&quality=')[1].split('&')[0] 
     except: 
      continue 
     if quality in d: 
      d[quality].append(url) 
     else: 
      d[quality] = [url] 

    try: 
     videoName = "|".join(data).split('&title=')[1].split('&')[0] 
    except Exception, e: 
     log.error("Could not parse VideoName out of get_video_info (%s)" % str(e)) 
     videoName = "" 

    videoName = unicode(videoName, 'utf-8') 
    d['VideoName'] = videoName.replace('+',' ').replace('--','-') 
    return d 


class NextList(object): 
    "A list with a 'next' method." 
    def __init__(self, l): 
     self.l = l 
     self.next_index = 0 

    def next(self): 
     if self.next_index < len(self.l): 
      value = self.l[self.next_index] 
      self.next_index += 1 
      return value 
     else: 
      return None 

    def isEOF(self): 
     " Checks if the list has reached the end " 
     return (self.next_index >= len(self.l)) 

class MetaUrl(object): 
    "a url strecture data with many metadata" 
    def __init__(self, url, source="", videoName="", quality="", youtube_watchurl=""): 
     self.url = str(url) 
     self.source = source 
     self.videoName = videoName # Youtube Links Only 
     self.quality = quality # Youtube Links Onlys 
     self.youtube_watchurl = youtube_watchurl # Youtube Links Onlys 

    def __repr__(self): 
     return "<MetaUrl '%s' | %s>" % (self.url, self.source) 


def search(song, n, processes=config.search_processes): 
    ''' 
    Function searches song and returns n valid .mp3 links. 
    @param song: Search string. 
    @param n: Number of songs. 
    @param processes: Number of processes to launch in the subprocessing pool. 
    ''' 
    linksFromSources = [] 
    pool = multiprocessing.Pool(processes) 

    args = [(song, source) for source in config.search_sources] 
    imapObj = pool.imap_unordered(_parse_star, args) 
    for i in range(len(args)): 
     linksFromSources.append(NextList(imapObj.next(15))) 
    pool.terminate() 

    links = [] 
    next_source = 0 
    while len(links) < n and not all(map(lambda x: x.isEOF(), linksFromSources)): 
     nextItem = linksFromSources[next_source].next() 
     if nextItem: 
      log.debug("added song %.80s from source ID %d (%s)" % (nextItem.url.split('/')[-1], next_source, nextItem.source)) 
      links.append(nextItem) 

     if len(linksFromSources) == next_source+1: 
      next_source = 0 
     else: 
      next_source += 1 

    return links 

def _parse_star(args): 
    return parse(*args) 
+0

Lo que se ve su código como? Sería bueno tener una referencia. –

+0

Aquí está el código: http://pastebin.com/F8QVUtkP. Tiene referencias a muchos archivos de proyecto, pero básicamente la función principal es 'search()' en la línea 249. La excepción en la pregunta se genera con el comando 'pool = multiprocessing.Pool (processes)'. No ocurre cuando se usa la clase de multiprocesamiento original. – iTayb

+0

@iTayb: ¿Qué argumentos estás pasando como 'procesos'? –

Respuesta

6

No puedo reproducir su problema en mi máquina. ¿Qué hay en tu variable processes? ¿Es un int?

Python 2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)] on win32 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import multiprocessing.dummy as multiprocessing 
>>> pool = multiprocessing.Pool(5) 
>>> pool 
<multiprocessing.pool.ThreadPool object at 0x00C7DF90> 
>>> 

---- ---- Editar

Probablemente también desea volver a comprobar si se había ensuciado su biblioteca estándar, intentar una instalación limpia de Python 2.7.3 en una carpeta diferente.

---- ---- Editar 2

Puede asignar rápidamente de esta manera:

import multiprocessing.dummy 
import weakref 
import threading 

class Worker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     poll = multiprocessing.dummy.Pool(5) 
     print str(poll) 

w = Worker() 
w._children = weakref.WeakKeyDictionary() 
w.start() 
+0

Su código funciona aquí. Lo suficientemente interesante, el código que cargué realmente funciona si lo ejecutas directamente. Lo que realmente ocurre es que tengo una GUI que ejecuta un hilo diferente (para evitar que la GUI se quede atascada). Ese hilo ejecuta la función 'búsqueda' específica que tiene el' ThreadPool' que falla. – iTayb

+1

Tiene razón, supongo que Python no cubrió la situación en la que creaba un ThreadPool desde un host que no procesaba. Vea la edición 2 para una solución. O, menos hacky, puede crear el 'Poll' en su hilo GUI y pasarlo a su hilo de trabajo para usar. – xbtsw

+1

Gracias. También emití un error en el equipo de desarrollo de python: http://bugs.python.org/issue14881. ¡Muchas gracias! – iTayb