2010-06-23 5 views
5

Imagine un árbol binario invertido con nodos A, B, C, D, E, F en el nivel 0. nodos G, H, I en el nivel 1, nodo J en el nivel 2, y el nodo K en el nivel 3.Implementación de un tipo especial de cola de multiprocesamiento en Python

nivel 1: G = func (a, B), H = func (C, D), i = func (e, F)

nivel 2: J = func (G, H)

Nivel 3: K = func (J, I).

Cada par de nodos en el Nivel 0 debe procesarse en orden, Cada par de nodos en el Nivel 1 puede procesarse en cualquier orden pero el resultado debe procesarse como se muestra en el siguiente nivel, y así sucesivamente hasta que terminemos con el resultado final, K.

El problema real es un problema de geometría computacional en el que una secuencia de sólidos se fusionan. A es adyacente a B, que es adyacente a C, y así sucesivamente. El fusible resultante de A y B (G) es adyacente al fusible de C y D (H). El fusible resultante de J e I (K) es el resultado final. Por lo tanto, no puede fusionar G e I ya que no son adyacentes. Si el número de nodos en un nivel no es una potencia de 2, terminará con una entidad colgante que debe procesarse un nivel más.

Dado que el proceso de fusibles es computacionalmente costoso y requiere mucha memoria, pero es muy paralelo, me gustaría utilizar el paquete de multiprocesamiento de Python y algún tipo de cola. Después de calcular G = func (A, B), me gustaría presionar el resultado G en la cola para el cálculo posterior de J = func (G, H). Cuando la cola está vacía, el último resultado es el resultado final. Tenga en cuenta que el mp.queue no necesariamente producirá resultados FIFO, ya que I = func (E, F) puede terminar antes de H = func (C, D)

He encontrado algunas (malas) soluciones pero estoy seguro de que hay una solución elegante más allá de mi alcance. Sugerencias?

+0

¿Por qué tiene que procesarse el nivel = 0 en orden, pero el nivel = 1 se puede procesar en cualquier orden? ¿No es suficiente elegir dos hojas conocidas y fusionarlas en un solo nodo? – Stephen

+0

No estoy correcto al decir que los nodos deben procesarse en orden. Deben procesarse en términos de adyacencia. A es adyacente a B es adyacente a C y así sucesivamente para el nivel 0. Puedes hacer func (A, B) o func (B, C) pero no func (A, C). Del mismo modo, en el nivel 1, G es adyacente a H y es adyacente a I. Puedes hacer func (G, H) o func (H, I) pero no func (G, I). – user90855

Respuesta

0

No pude idear un diseño inteligente para una cola, pero puede reemplazar fácilmente la cola con un proceso más, que en mi ejemplo llamé al WorkerManager. Este proceso reúne los resultados de todos los procesos Worker e inicia nuevos trabajadores solo si hay dos paquetes de datos adyacentes a la espera de ser procesados. De esta forma, nunca intentarás unir resultados no adyacentes, por lo que puedes ignorar los "niveles" y disparar el cálculo del siguiente par tan pronto como esté listo.

from multiprocessing import Process, Queue 

class Result(object): 
    '''Result from start to end.''' 
    def __init__(self, start, end, data): 
     self.start = start 
     self.end = end 
     self.data = data 


class Worker(Process): 
    '''Joins two results into one result.''' 
    def __init__(self, result_queue, pair): 
     self.result_queue = result_queue 
     self.pair = pair 
     super(Worker, self).__init__() 

    def run(self): 
     left, right = self.pair 
     result = Result(left.start, right.end, 
         '(%s, %s)' % (left.data, right.data)) 
     self.result_queue.put(result) 


class WorkerManager(Process): 
    ''' 
    Takes results from result_queue, pairs them 
    and assigns workers to process them. 
    Returns final result into final_queue. 
    ''' 
    def __init__(self, result_queue, final_queue, start, end): 
     self._result_queue = result_queue 
     self._final_queue = final_queue 
     self._start = start 
     self._end = end 
     self._results = [] 
     super(WorkerManager, self).__init__() 

    def run(self): 
     while True: 
      result = self._result_queue.get() 
      self._add_result(result) 
      if self._has_final_result(): 
       self._final_queue.put(self._get_final_result()) 
       return 
      pair = self._find_adjacent_pair() 
      if pair: 
       self._start_worker(pair) 

    def _add_result(self, result): 
     self._results.append(result) 
     self._results.sort(key=lambda result: result.start) 

    def _has_final_result(self): 
     return (len(self._results) == 1 
       and self._results[0].start == self._start 
       and self._results[0].end == self._end) 

    def _get_final_result(self): 
     return self._results[0] 

    def _find_adjacent_pair(self): 
     for i in xrange(len(self._results) - 1): 
      left, right = self._results[i], self._results[i + 1] 
      if left.end == right.start: 
       self._results = self._results[:i] + self._results[i + 2:] 
       return left, right 

    def _start_worker(self, pair): 
     worker = Worker(self._result_queue, pair) 
     worker.start() 

if __name__ == '__main__': 
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)] 
    result_queue = Queue() 
    final_queue = Queue() 
    start = 0 
    end = len(DATA) 
    man = WorkerManager(result_queue, final_queue, start, end) 
    man.start() 
    for res in DATA: 
     result_queue.put(res) 
    final = final_queue.get() 
    print final.start 
    # 0 
    print final.end 
    # 6 
    print final.data 
    # For example: 
    # (((0, 1), (2, 3)), (4, 5)) 

Para mi ejemplo, he usado un simple Worker que devuelve datos que figuran entre paréntesis, separados por una coma, pero se puede poner cualquier cálculo en ese país. En mi caso, el resultado final fue (((0, 1), (2, 3)), (4, 5)), lo que significa que el algoritmo calculó (0, 1) y (2, 3) antes de calcular ((0, 1), (2, 3)) y luego se unió al resultado con (4, 5). Espero que esto sea lo que estabas buscando.

+0

me ocurrió con una solución que se parece a: fusor def (formas): shape1_id, shape1 = formas [0] shape2_id, shape2 = formas [1] fusionado = OCC.BRepAlgoAPI.BRepAlgoAPI_Fuse (shape1, shape2).Forma() return ((shape1_id, shape2_id), fusionada) results = [(i, a) para i, a en enumerate (slices)] while len (resultados)> 1: P = processing.Pool (7 results = P.map (fuser, [(a, b) para a, b en zip (resultados [:: 2], resultados [1 :: 2])]) results.sort (clave = resultado lambda: resultado [0]) – user90855

Cuestiones relacionadas