Estoy trabajando en una aplicación web de Django que necesita consultar una base de datos PostgreSQL. Al implementar la concurrencia mediante la interfaz de Python threading, obtengo errores DoesNotExist
para los elementos consultados. Por supuesto, estos errores no ocurren cuando se realizan las consultas secuencialmente.Errores de base de datos en Django cuando se utiliza el subprocesamiento
Te voy a enseñar una prueba de unidad que escribí para demostrar el comportamiento inesperado:
class ThreadingTest(TestCase):
fixtures = ['demo_city',]
def test_sequential_requests(self):
"""
A very simple request to database, made sequentially.
A fixture for the cities has been loaded above. It is supposed to be
six cities in the testing database now. We will made a request for
each one of the cities sequentially.
"""
for number in range(1, 7):
c = City.objects.get(pk=number)
self.assertEqual(c.pk, number)
def test_threaded_requests(self):
"""
Now, to test the threaded behavior, we will spawn a thread for
retrieving each city from the database.
"""
threads = []
cities = []
def do_requests(number):
cities.append(City.objects.get(pk=number))
[threads.append(threading.Thread(target=do_requests, args=(n,))) for n in range(1, 7)]
[t.start() for t in threads]
[t.join() for t in threads]
self.assertNotEqual(cities, [])
Como se puede ver, la primera prueba realiza algunas peticiones de base de datos de forma secuencial, los cuales son de hecho trabajando sin ningún problema. La segunda prueba, sin embargo, realiza exactamente las mismas solicitudes, pero cada solicitud se genera en un hilo. En realidad, esta falla y devuelve una excepción DoesNotExist
.
La salida de la ejecución de esta unidad de pruebas es la siguiente:
test_sequential_requests (cesta.core.tests.threadbase.ThreadingTest) ... ok
test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest) ...
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
cities.append(City.objects.get(pk=number))
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
return self.get_query_set().get(*args, **kwargs)
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
% self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
... otros hilos devuelve una salida similar ...
Exception in thread Thread-6:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
cities.append(City.objects.get(pk=number))
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
return self.get_query_set().get(*args, **kwargs)
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
% self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
FAIL
======================================================================
FAIL: test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 52, in test_threaded_requests
self.assertNotEqual(cities, [])
AssertionError: [] == []
----------------------------------------------------------------------
Ran 2 tests in 0.278s
FAILED (failures=1)
Destroying test database for alias 'default' ('test_cesta')...
Recuerde que todo esto es sucediendo en una base de datos PostgreSQL, que se supone que es segura para subprocesos, no con el SQLite o similares. La prueba se ejecutó usando PostgreSQL también.
En este punto, estoy totalmente perdido acerca de lo que puede estar fallando. Alguna idea o sugerencia?
Gracias!
EDIT: Escribí una pequeña vista para comprobar si funciona correctamente. Aquí está el código de la vista:.
def get_cities(request):
queue = Queue.Queue()
def get_async_cities(q, n):
city = City.objects.get(pk=n)
q.put(city)
threads = [threading.Thread(target=get_async_cities, args=(queue, number)) for number in range(1, 5)]
[t.start() for t in threads]
[t.join() for t in threads]
cities = list()
while not queue.empty():
cities.append(queue.get())
return render_to_response('async/cities.html', {'cities': cities},
context_instance=RequestContext(request))
(Por favor, no tome en cuenta la locura de escribir la lógica de la aplicación dentro del código de la vista Recuerde que esto es sólo una prueba de concepto y no sería nunca se en la aplicación real)
El resultado es que el código funciona bien, las solicitudes se realizan con éxito en los hilos y la vista finalmente muestra las ciudades después de llamar a su URL.
Por lo tanto, creo que realizar consultas utilizando hilos solo será un problema cuando deba probar el código. En producción, funcionará sin ningún problema.
¿Alguna sugerencia útil para probar este tipo de código con éxito?
¿Estás seguro de que el dispositivo se importa? ¿Puedes pegar el registro que dice: "Fixture demo_city procesada" o algo así ... Ah, no importa ... simplemente no he leído la pregunta completa ... – Tisho