2010-09-07 48 views
63

¿Hay alguna manera de hacer que SQLAlchemy haga una inserción masiva en lugar de insertar cada objeto individualmente? es decir,Inserción masiva con SQLAlchemy ORM

haciendo:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3) 

en lugar de:

INSERT INTO `foo` (`bar`) VALUES (1) 
INSERT INTO `foo` (`bar`) VALUES (2) 
INSERT INTO `foo` (`bar`) VALUES (3) 

He acaba de convertir algo de código para utilizar sqlalchemy en lugar de SQL prima y aunque ahora es mucho más agradable trabajar con él parece ser más lento ahora (hasta un factor de 10), me pregunto si esta es la razón.

Puede ser que podría mejorar la situación usando sesiones de manera más eficiente. Por el momento tengo autoCommit=False y hago un session.commit() después de haber agregado algunas cosas. Aunque esto parece causar que los datos se vuelvan obsoletos si el DB se cambia en otro lugar, como si incluso si hiciera una nueva consulta, ¿todavía obtuviera resultados viejos?

Gracias por su ayuda!

+1

Esto podría ayudar: http://stackoverflow.com/questions/270879/efficiently-updating-database-using-sqlalchemy-orm/278606#278606 –

+0

Nick, entiendo que esto es un * muy * antiguo puesto . ¿Sería posible actualizar el título a algo * correcto * como "inserción de múltiples registros con SQLAlchemy ORM". Las instrucciones de inserción de múltiples registros como la que ha proporcionado son bastante diferentes de las operaciones de carga masiva a nivel de base de datos. Las inserciones masivas están destinadas a la carga de datos a 1k +, generalmente de grandes conjuntos de datos y hechos por administradores de aplicaciones, no por operaciones REST o código de nivel de aplicación ... Usemos nuestra nomenclatura correctamente. – W4t3randWind

Respuesta

25

Hasta donde yo sé, no hay forma de que el ORM emita inserciones en bloque. Creo que la razón subyacente es que SQLAlchemy necesita realizar un seguimiento de la identidad de cada objeto (es decir, nuevas claves primarias) y las inserciones masivas interfieren con eso. Por ejemplo, asumiendo que su mesa de foo contiene una columna de id y se asigna a una clase Foo:

x = Foo(bar=1) 
print x.id 
# None 
session.add(x) 
session.flush() 
# BEGIN 
# INSERT INTO foo (bar) VALUES(1) 
# COMMIT 
print x.id 
# 1 

Desde SQLAlchemy recogió el valor de x.id sin emitir otra consulta, se puede inferir que se obtuvo el valor directamente desde el INSERT declaración. Si no es necesario el posterior acceso a los objetos creados a través de los mismos casos, puede omitir la capa ORM para su inserción:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}]) 
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,)) 

SQLAlchemy no puede coincidir con estas nuevas filas con objetos existentes, por lo tendrá que consultarlos nuevamente para cualquier operación posterior.

En lo que respecta a los datos obsoletos, es útil recordar que la sesión no tiene una forma incorporada de saber cuándo se cambia la base de datos fuera de la sesión. Para acceder a datos modificados externamente a través de instancias existentes, las instancias deben estar marcadas como expiradas. Esto ocurre por defecto en session.commit(), pero se puede hacer manualmente llamando al session.expire_all() o session.expire(instance). Un ejemplo (SQL omitido):

x = Foo(bar=1) 
session.add(x) 
session.commit() 
print x.bar 
# 1 
foo.update().execute(bar=42) 
print x.bar 
# 1 
session.expire(x) 
print x.bar 
# 42 

session.commit() expira x, por lo que la primera sentencia print implícitamente se abre una nueva transacción y re-consultas atributos x 's. Si comenta la primera declaración impresa, notará que la segunda ahora recoge el valor correcto, porque la nueva consulta no se emite hasta después de la actualización.

Esto tiene sentido desde el punto de vista del aislamiento transaccional: solo debe elegir modificaciones externas entre transacciones. Si esto le está causando problemas, le sugiero que aclare o vuelva a pensar en los límites de las transacciones de su aplicación en lugar de llegar inmediatamente al session.expire_all().

+0

Gracias por su respuesta, voy a intentarlo. WRT el problema de vencimiento, lo que vi no era lo mismo. Estoy usando una sesión de ámbito en turbogears. Al realizar una consulta getSession(). (Foo) .filter .... all() devolvió cosas diferentes según la solicitud, y tampoco devolvió los registros actualizados que estaban en el archivo db hasta que lo reinicié. Solucioné este problema haciendo un autocommit = True y añadiendo algo que elimine() d la sesión después de que se completara la solicitud (supongo que debes hacerlo de todos modos). –

+0

Supongo que devolvió cosas diferentes según la solicitud porque tenía una sesión de ámbito por subproceso en el grupo y las sesiones estaban en estados diferentes. Sin embargo, parecía un poco extraño que sa no obtuviera nuevos datos después de una nueva solicitud. Supongo que no estoy entendiendo lo que autocommit = False está haciendo –

+0

Con 'autocommit = False', creo que deberías estar llamando' session.commit() 'al completar la solicitud (no estoy familiarizado con TurboGears, así que ignora esto si eso se maneja para ti a nivel del marco). Además de asegurarse de que sus cambios hayan llegado a la base de datos, esto caducaría todo en la sesión. La próxima transacción no comenzará hasta el próximo uso de esa sesión, por lo que las futuras solicitudes en el mismo subproceso no verán datos obsoletos. – dhaffey

7

Se agregó soporte directo a SQLAlchemy a partir de la versión 0.8

Según el docs, connection.execute(table.insert().values(data)) debería hacer el truco. (Tenga en cuenta que esto es no lo mismo que connection.execute(table.insert(), data) lo que se traduce en muchas inserciones de fila individuales a través de una llamada a executemany). En cualquier cosa menos una conexión local de la diferencia en el rendimiento puede ser enorme.

4

Esta es una manera:

values = [1, 2, 3] 
Foo.__table__.insert().execute([{'bar': x} for x in values]) 

Esto insertará como esto:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3) 

de referencia: La SQLAlchemy FAQ incluye puntos de referencia para varios cometer métodos.

75

SQLAlchemy introdujo en la versión que 1.0.0:

Bulk operations - SQLAlchemy docs

Con estas operaciones, ahora se puede hacer las inserciones o actualizaciones!

Por ejemplo, se puede hacer:

s = Session() 
objects = [ 
    User(name="u1"), 
    User(name="u2"), 
    User(name="u3") 
] 
s.bulk_save_objects(objects) 

Aquí, se hará una inserción masiva.

+14

También necesita s.commit() para guardar realmente los registros (me tomó un poco resolver esto). –

+2

Intenté esto con sqlachemy 1.0.11 y todavía hace 3 instrucciones de inserción. Pero es mucho más rápido que las operaciones normales de orm. – zidarsk8

+1

Si bien no es pertinente para la pregunta de OP, vale la pena mencionar que esto rompe ciertas características del ORM. http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#orm-compatibility – dangel

10

Los documentos SQLAlchemy tienen una gran valoración crítica sobre el desempeño de las diversas técnicas que se pueden utilizar para las inserciones:

ORM, básicamente, no están destinados a las inserciones de alto rendimiento - esta es la razón por SQLAlchemy ofrece el núcleo, además de la ORM como un componente de primera clase.

Para el caso de uso de inserciones masivas rápidas, la generación de SQL y el sistema de ejecución que el ORM crea encima forma parte del Núcleo. El uso de este sistema directamente, podemos producir un inserto que es competitivo con utilizando directamente la API de base de datos en bruto.

Alternativamente, el SQLAlchemy ORM ofrece la Operaciones de baño a granel de métodos, que proporcionan ganchos en subsecciones de la unidad de trabajo proceso con el fin de emitir INSERT Core-nivel y ACTUALIZACIÓN construcciones con un pequeño grado de basado en ORM automatización.

El siguiente ejemplo ilustra las pruebas basadas en el tiempo para varios métodos diferentes de insertar filas, yendo del más automatizado al menos. Con CPython 2.7, tiempos de ejecución observado:

classics-MacBook-Pro:sqlalchemy classic$ python test.py 
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs 
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs 
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs 
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs 
sqlite3: Total time for 100000 records 0.487842082977 sec 

Guión:

import time 
import sqlite3 

from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import Column, Integer, String, create_engine 
from sqlalchemy.orm import scoped_session, sessionmaker 

Base = declarative_base() 
DBSession = scoped_session(sessionmaker()) 
engine = None 


class Customer(Base): 
    __tablename__ = "customer" 
    id = Column(Integer, primary_key=True) 
    name = Column(String(255)) 


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'): 
    global engine 
    engine = create_engine(dbname, echo=False) 
    DBSession.remove() 
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) 
    Base.metadata.drop_all(engine) 
    Base.metadata.create_all(engine) 


def test_sqlalchemy_orm(n=100000): 
    init_sqlalchemy() 
    t0 = time.time() 
    for i in xrange(n): 
     customer = Customer() 
     customer.name = 'NAME ' + str(i) 
     DBSession.add(customer) 
     if i % 1000 == 0: 
      DBSession.flush() 
    DBSession.commit() 
    print(
     "SQLAlchemy ORM: Total time for " + str(n) + 
     " records " + str(time.time() - t0) + " secs") 


def test_sqlalchemy_orm_pk_given(n=100000): 
    init_sqlalchemy() 
    t0 = time.time() 
    for i in xrange(n): 
     customer = Customer(id=i+1, name="NAME " + str(i)) 
     DBSession.add(customer) 
     if i % 1000 == 0: 
      DBSession.flush() 
    DBSession.commit() 
    print(
     "SQLAlchemy ORM pk given: Total time for " + str(n) + 
     " records " + str(time.time() - t0) + " secs") 


def test_sqlalchemy_orm_bulk_insert(n=100000): 
    init_sqlalchemy() 
    t0 = time.time() 
    n1 = n 
    while n1 > 0: 
     n1 = n1 - 10000 
     DBSession.bulk_insert_mappings(
      Customer, 
      [ 
       dict(name="NAME " + str(i)) 
       for i in xrange(min(10000, n1)) 
      ] 
     ) 
    DBSession.commit() 
    print(
     "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + 
     " records " + str(time.time() - t0) + " secs") 


def test_sqlalchemy_core(n=100000): 
    init_sqlalchemy() 
    t0 = time.time() 
    engine.execute(
     Customer.__table__.insert(), 
     [{"name": 'NAME ' + str(i)} for i in xrange(n)] 
    ) 
    print(
     "SQLAlchemy Core: Total time for " + str(n) + 
     " records " + str(time.time() - t0) + " secs") 


def init_sqlite3(dbname): 
    conn = sqlite3.connect(dbname) 
    c = conn.cursor() 
    c.execute("DROP TABLE IF EXISTS customer") 
    c.execute(
     "CREATE TABLE customer (id INTEGER NOT NULL, " 
     "name VARCHAR(255), PRIMARY KEY(id))") 
    conn.commit() 
    return conn 


def test_sqlite3(n=100000, dbname='sqlite3.db'): 
    conn = init_sqlite3(dbname) 
    c = conn.cursor() 
    t0 = time.time() 
    for i in xrange(n): 
     row = ('NAME ' + str(i),) 
     c.execute("INSERT INTO customer (name) VALUES (?)", row) 
    conn.commit() 
    print(
     "sqlite3: Total time for " + str(n) + 
     " records " + str(time.time() - t0) + " sec") 

if __name__ == '__main__': 
    test_sqlalchemy_orm(100000) 
    test_sqlalchemy_orm_pk_given(100000) 
    test_sqlalchemy_orm_bulk_insert(100000) 
    test_sqlalchemy_core(100000) 
    test_sqlite3(100000) 
5

SQLAlchemy introdujo que en la versión 1.0.0:

Bulk operations - SQLAlchemy docs

Con estas operaciones, ahora se puede hacer inserciones a granel o actualizaciones!

Por ejemplo (si desea que el menor sobrecarga para las inserciones de mesa simples), puede utilizar Session.bulk_insert_mappings():

loadme = [ 
     (1, 'a') 
    , (2, 'b') 
    , (3, 'c') 
    ] 

dicts = [] 
for i in range(len(loadme)): 
    dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1])) 

s = Session() 
s.bulk_insert_mappings(Foo, dicts) 
s.commit() 

O, si lo desea, omita los loadme tuplas y escribir los diccionarios directamente en dicts (pero Me resulta más fácil dejar todo el palabreo fuera de los datos y cargar una lista de diccionarios en un bucle).

3

La respuesta de Piere es correcta, pero un problema es que bulk_save_objects de forma predeterminada no devuelve las claves principales de los objetos, si eso le preocupa. Establezca return_defaults en True para obtener este comportamiento.

La documentación es here.

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')] 
session.bulk_save_objects(foos, return_defaults=True) 
for foo in foos: 
    assert foo.id is not None 
session.commit() 
10

Normalmente lo hago usando add_all.

from app import session 
from models import User 

objects = [User(name="u1"), User(name="u2"), User(name="u3")] 
session.add_all(objects) 
session.commit() 
Cuestiones relacionadas