2009-08-25 23 views
8

Estoy usando SQLAlchemy con un backend de Postgres para hacer una inserción o actualización masiva. Para intentar mejorar el rendimiento, intento comprometerme solo una vez cada mil filas más o menos:¿Cómo realizo eficientemente una inserción o actualización masiva con SQLAlchemy?

trans = engine.begin() 
    for i, rec in enumerate(records): 
    if i % 1000 == 0: 
     trans.commit() 
     trans = engine.begin() 
    try: 
     inserter.execute(...) 
    except sa.exceptions.SQLError: 
     my_table.update(...).execute() 
trans.commit() 

Sin embargo, esto no funciona. Parece que cuando INSERT falla, deja las cosas en un estado extraño que impide que ocurra la ACTUALIZACIÓN. ¿Está revertiendo automáticamente la transacción? Si es así, ¿se puede detener esto? No quiero que se retrotraiga toda la transacción en caso de que haya un problema, por lo que estoy intentando detectar la excepción en primer lugar.

El mensaje de error que recibo, por cierto, es "sqlalchemy.exc.InternalError: (InternalError) la transacción actual se cancela, los comandos se ignoran hasta el final del bloque de transacción", y ocurre en la actualización(). Execute () llamada.

Respuesta

5

Se está produciendo un comportamiento extraño de Postgresql específico: si se produce un error en una transacción, obliga a deshacer toda la transacción. Considero que esto es un error de diseño de Postgres; se necesita bastante contorsionismo de SQL para funcionar en algunos casos.

Una solución es hacer primero la ACTUALIZACIÓN. Detecta si realmente modificó una fila mirando cursor.rowcount; si no modificó ninguna fila, no existía, así que haga el INSERTAR. (Esto será más rápido si se actualiza con más frecuencia que se inserta, por supuesto.)

Otra solución consiste en utilizar puntos de retorno:

SAVEPOINT a; 
INSERT INTO ....; 
-- on error: 
ROLLBACK TO SAVEPOINT a; 
UPDATE ...; 
-- on success: 
RELEASE SAVEPOINT a; 

Esto tiene un serio problema para el código de calidad de producción: hay que detectar el error con precisión. Es de suponer que espera alcanzar una restricción única, pero puede golpear algo inesperado, y puede ser casi imposible distinguir de manera confiable el error esperado del inesperado. Si esto llega a la condición de error incorrectamente, dará lugar a problemas oscuros donde nada se actualizará o insertará y no se verá ningún error. Sé muy cuidadoso con esto. Puede limitar el caso de error mirando el código de error de Postgresql para asegurarse de que es el tipo de error que está esperando, pero el problema potencial todavía está allí.

Finalmente, si realmente quiere hacer batch-insert-or-update, realmente quiere hacer muchos de ellos en unos pocos comandos, no un elemento por comando. Esto requiere SQL más engañoso: SELECCIONAR anidado dentro de un INSERT, filtrando los ítems correctos para insertar y actualizar.

+1

"Si ocurre un error en una transacción, obliga a que se revierte toda la transacción. Considero que esto es un error de diseño de Postgres". - ¿No es este el objetivo de las transacciones? De [Wikipedia] (http: //en.wikipedia.org/wiki/Database_transaction): "Las transacciones proporcionan una proposición 'todo o nada', que establece que cada unidad de trabajo realizada en una base de datos debe completarse en su totalidad o no tener ningún efecto en absoluto". – spiffytech

+0

@Spiffytech Buena respuesta. Esto realmente me hizo jadear. –

4

Este error es de PostgreSQL. PostgreSQL no le permite ejecutar comandos en la misma transacción si un comando crea un error. Para solucionarlo, puede usar transacciones anidadas (implementadas usando puntos de almacenamiento SQL) a través del conn.begin_nested(). Aquí hay algo que podría funcionar. Hice que el código utilizara conexiones explícitas, factoricé la parte fragmentada e hice que el código utilizara el administrador de contexto para administrar las transacciones correctamente.

from itertools import chain, islice 
def chunked(seq, chunksize): 
    """Yields items from an iterator in chunks.""" 
    it = iter(seq) 
    while True: 
     yield chain([it.next()], islice(it, chunksize-1)) 

conn = engine.commit() 
for chunk in chunked(records, 1000): 
    with conn.begin(): 
     for rec in chunk: 
      try: 
       with conn.begin_nested(): 
        conn.execute(inserter, ...) 
      except sa.exceptions.SQLError: 
       conn.execute(my_table.update(...)) 

Esto todavía no tendrá un rendimiento estelar debido a la sobrecarga de transacción anidada. Si desea un mejor rendimiento, intente detectar qué filas crearán errores de antemano con una consulta de selección y utilice executemany support (execute puede tomar una lista de dicts si todas las inserciones usan las mismas columnas). Si necesita manejar actualizaciones concurrentes, igual tendrá que manejar el error ya sea reintentando o volviendo a insertar uno por uno.

Cuestiones relacionadas