2011-09-26 9 views
18

Al igual que en otra publicación que hice, esto responde a esa publicación y crea una nueva pregunta.Crear conexión de base de datos y mantener en múltiples procesos (multiprocesamiento)

Recapitulación: Necesito actualizar cada registro en una base de datos espacial en la que tengo un conjunto de datos de puntos que superponen el conjunto de datos de los polígonos. Para cada característica de punto, quiero asignar una clave para relacionarla con la característica del polígono en la que se encuentra. Entonces, si mi punto 'Ciudad de Nueva York' se encuentra dentro del polígono de EE. UU. Y para el polígono de EE. UU. 'GID = 1' asignaré 'gid_fkey = 1' para mi punto Nueva York.

Bien, esto se ha logrado mediante el multiprocesamiento. He notado un aumento del 150% en la velocidad con esto, así que funciona. Pero creo que hay un montón de sobrecarga innecesaria ya que se requiere una conexión de DB para cada registro.

Así que aquí está el código:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task() 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self):   
     pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     pyConn.set_isolation_level(0) 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
    for w in consumers: 
     w.start() 

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConnX.set_isolation_level(0) 
    pyCursorX = pyConnX.cursor() 

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
    temp = pyCursorX.fetchall()  
    num_job = temp[0] 
    num_jobs = num_job[0] 

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
    cityIdListTuple = pyCursorX.fetchall()  

    cityIdListList = [] 

    for x in cityIdListTuple: 
     cityIdList.append(x[0]) 


    for i in xrange(num_jobs): 
     tasks.put(Task(cityIdList[i - 1])) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    while num_jobs: 
     result = results.get() 
     print result 
     num_jobs -= 1 

Parece ser entre 0,3 y 1,5 segundos por conexión como tengo medirlo con el módulo de 'tiempo'.

¿Hay alguna manera de hacer una conexión de base de datos por proceso y luego simplemente usar la información de city_id como una variable que puedo ingresar en una consulta para el cursor en este abrir? De esta manera hago cuatro procesos, cada uno con una conexión DB y luego dejo caer city_id de alguna manera para procesar.

Respuesta

31

intentar aislar la creación de la conexión en el constructor del Consumidor, a continuación, dar a la tarea ejecutada:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     self.pyConn.set_isolation_level(0) 


    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task(connection=self.pyConn) 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self, connection=None):   
     pyConn = connection 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 
+1

compañero que funcionaba un lujo. No tienes elogios para darte el visto bueno, pero ese código era absolutamente mágico. Deshacerse de las conexiones de DB constantes ha aumentado fácilmente la velocidad en otro 50%. Posiblemente más cerca del 100% en algunos casos. Gracias de nuevo. –

+0

@ ENE_: Me alegra que te haya ayudado :). Debe aceptar la respuesta, tiene derecho a hacerlo porque es el propietario de la pregunta. –

+0

Bien, tengo que admitir que pensé que debería presionar la flecha hacia arriba en lugar del tic. 'Tick of approval' fue un desafortunado autodenunciante = D –

Cuestiones relacionadas