Al igual que en otra publicación que hice, esto responde a esa publicación y crea una nueva pregunta.Crear conexión de base de datos y mantener en múltiples procesos (multiprocesamiento)
Recapitulación: Necesito actualizar cada registro en una base de datos espacial en la que tengo un conjunto de datos de puntos que superponen el conjunto de datos de los polígonos. Para cada característica de punto, quiero asignar una clave para relacionarla con la característica del polígono en la que se encuentra. Entonces, si mi punto 'Ciudad de Nueva York' se encuentra dentro del polígono de EE. UU. Y para el polígono de EE. UU. 'GID = 1' asignaré 'gid_fkey = 1' para mi punto Nueva York.
Bien, esto se ha logrado mediante el multiprocesamiento. He notado un aumento del 150% en la velocidad con esto, así que funciona. Pero creo que hay un montón de sobrecarga innecesaria ya que se requiere una conexión de DB para cada registro.
Así que aquí está el código:
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self):
pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
w.start()
pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()
pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]
pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()
cityIdListList = []
for x in cityIdListTuple:
cityIdList.append(x[0])
for i in xrange(num_jobs):
tasks.put(Task(cityIdList[i - 1]))
for i in xrange(num_consumers):
tasks.put(None)
while num_jobs:
result = results.get()
print result
num_jobs -= 1
Parece ser entre 0,3 y 1,5 segundos por conexión como tengo medirlo con el módulo de 'tiempo'.
¿Hay alguna manera de hacer una conexión de base de datos por proceso y luego simplemente usar la información de city_id como una variable que puedo ingresar en una consulta para el cursor en este abrir? De esta manera hago cuatro procesos, cada uno con una conexión DB y luego dejo caer city_id de alguna manera para procesar.
compañero que funcionaba un lujo. No tienes elogios para darte el visto bueno, pero ese código era absolutamente mágico. Deshacerse de las conexiones de DB constantes ha aumentado fácilmente la velocidad en otro 50%. Posiblemente más cerca del 100% en algunos casos. Gracias de nuevo. –
@ ENE_: Me alegra que te haya ayudado :). Debe aceptar la respuesta, tiene derecho a hacerlo porque es el propietario de la pregunta. –
Bien, tengo que admitir que pensé que debería presionar la flecha hacia arriba en lugar del tic. 'Tick of approval' fue un desafortunado autodenunciante = D –