2011-03-13 22 views
28

¿Cómo puedo hacer un postre en bulto en pymongo? Quiero actualizar un montón de entradas y hacerlas de una en una es muy lento.Fast or Bulk Upsert en pymongo

La respuesta a una pregunta casi idéntica está aquí: Bulk update/upsert in MongoDB?

La respuesta aceptada en realidad no responde a la pregunta. Simplemente da un enlace a la CLI de mongo para hacer importaciones/exportaciones.

También estaría abierto a que alguien explique por qué no es posible realizar una restauración masiva/no es una buena práctica, pero explique cuál es la solución preferida para este tipo de problema.

Gracias!

Respuesta

4

La respuesta sigue siendo la misma: no se admite el envío masivo de mensajes.

+2

Esto es muy triste. – ComputationalSocialScience

+5

Esto ya no es el caso. Ver la respuesta de Kevin. –

+0

Bulk.find.upsert() https://docs.mongodb.com/manual/reference/method/Bulk.find.upsert/index.html – Tanuj

1

Puede actualizar todos los documentos que coincidan con su especificación de consulta utilizando multi = True.

Hay un error here sobre hacer un lote de comandos de la manera que desee.

27

MongoDB 2.6+ tiene soporte para operaciones masivas. Esto incluye inserciones en bloque, actualizaciones, actualizaciones, etc. El objetivo de esto es reducir/eliminar las demoras de la latencia de ida y vuelta de las operaciones de registro por registro ('documento por documento' es correcto).

Entonces, ¿cómo funciona esto? Ejemplo en Python, porque eso es lo que estoy trabajando.

>>> import pymongo 
>>> pymongo.version 
'2.7rc0' 

Para utilizar esta función, se crea un objeto 'a granel', añadir documentos a la misma, a continuación, llamar ejecutar en él y se enviará todas las actualizaciones En seguida. Advertencias: El tamaño BSON de las operaciones recopiladas (suma de los bsonsizes) no puede sobrepasar el límite de tamaño de documento de 16 MB. Por supuesto, el número de operaciones puede variar significativamente, su millaje puede variar.

Ejemplo en Pymongo de operación upsert granel:

import pymongo 
conn = pymongo.MongoClient('myserver', 8839) 
db = conn['mydbname'] 
coll = db.myCollection 
bulkop = coll.initialize_ordered_bulk_op() 
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}) 
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}) 
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}) 
retval = bulkop.execute() 

Este es el método esencial. Más información disponible en:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

+1

no hay alguna forma de evitar hacer un hallazgo por cada operación? Considere que quiero añadir una lista de objetos json que están incrustados en un documento, de esta manera, tengo que encontrar ese documento cada vez y añadir, esto no parece eficiente. – SpiXel

+0

Esta es solo la forma de escribir de MongoDB, no es mi idea. En cualquier actualización o actualización, debe encontrar los documentos que desea actualizar y esa búsqueda especifica sobre cuáles actuar. TAMBIÉN, se supone que es eficiente para grandes grupos de acciones, pero no necesariamente para su aplicación específica, así que, como dicen, su millaje puede variar. –

+0

La sintaxis 'initialize_ordered_bulk_op()' está ahora en desuso: http://api.mongodb.com/python/current/changelog.html?highlight=initialize_ordered_bulk_op – duhaime

17

lanzamientos modernos de pymongo (mayor que 3.x) envuelven las operaciones a granel en una interfaz consistente que rebaja en donde la versión del servidor no admite operaciones masivas. Esto ahora es consistente en los controladores oficialmente compatibles con MongoDB.

Por lo tanto, el método preferido para la codificación es usar bulk_write() en su lugar, donde utiliza una UpdateOne otra acción de operación apropiada en su lugar.Y ahora, por supuesto, se prefiere usar las listas de lenguaje natural en lugar de un constructor específico

La traducción directa de la antigua documention:

from pymongo import UpdateOne 

operations = [ 
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True), 
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True), 
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True) 
] 

result = collection.bulk_write(operations) 

o el clásico circuito de transformación de documentos:

import random 
from pymongo import UpdateOne 

random.seed() 

operations = [] 

for doc in collection.find(): 
    # Set a random number on every document update 
    operations.append(
     UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } }) 
    ) 

    # Send once every 1000 in batch 
    if (len(operations) == 1000): 
     collection.bulk_write(operations,ordered=False) 
     operations = [] 

if (len(operations) > 0): 
    collection.bulk_write(operations,ordered=False) 

El resultado devuelto es de BulkWriteResult que contendrá los contadores de documentos adaptados y actualizados, así como los valores devueltos _id para cualquier "upserts" que se producen.

hay un poco de un error sobre el tamaño de la matriz de las operaciones a granel. La solicitud real tal como se envió al servidor no puede superar el límite de BSON de 16 MB, ya que ese límite también se aplica a la "solicitud" enviada al servidor que también utiliza el formato BSON.

Sin embargo que no gobierna el tamaño de la matriz de solicitud que se puede construir, ya que sólo se enviarán las operaciones reales y se procesan en lotes de 1000 de todos modos. La única restricción real es que esas 1000 instrucciones de operación en sí mismas no crean realmente un documento BSON de más de 16 MB. Que de hecho es una orden bastante difícil.

El concepto general de los métodos a granel es "menos tráfico", como resultado de enviar cosas a la vez y sólo se trata de una respuesta del servidor. La reducción de esa sobrecarga adjunta a cada solicitud de actualización ahorra mucho tiempo.

+1

Disculpe, pero no entiendo, ¿por qué divide sus lotes en su ciclo si sabe que su controlador lo hace por sí mismo. Quiero decir en su ejemplo, 1000 documentos de 'UpdateOne ({" _id ": doc [" _ id "]}, {" $ set ": {" random ": random.randint (0,10)}})' son menos de 16 Mb y es obvio, por lo tanto, Pymongo puede dividir sus datos y la solicitud será exitosa. ¿Por qué divide sus datos con sus manos? –

+2

@Budulianin porque está iterando un cursor y es mucho y antipatrón para cargar toda la colección en la memoria. Eso es básicamente por qué están agrupados en un tamaño razonable.Además, esto tiene que pasar por alto, y especialmente teniendo en cuenta los entornos asíncronos (donde la técnica básica sigue siendo la misma), básicamente puede enviar datos a través del cable y construir otro lote mientras espera el reconocimiento. Si crees que ganas algo al hacer "un lote enorme" en lugar de "varios razonables", entonces en la mayoría de los casos diría que es poco probable –

0

actualización masiva más rápido con Python 3.5 +, motor y asyncio:

import asyncio 
import datetime 
import logging 
import random 
import time 

import motor.motor_asyncio 
import pymongo.errors 


async def execute_bulk(bulk): 
    try: 
     await bulk.execute() 
    except pymongo.errors.BulkWriteError as err: 
     logging.error(err.details) 


async def main(): 
    cnt = 0 
    bulk = db.initialize_unordered_bulk_op() 
    tasks = [] 
    async for document in db.find({}, {}, no_cursor_timeout=True): 
     cnt += 1 
     bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}}) 
     if not cnt % 1000: 
      task = asyncio.ensure_future(execute_bulk(bulk)) 
      tasks.append(task) 
      bulk = db.initialize_unordered_bulk_op() 
    if cnt % 1000: 
     task = asyncio.ensure_future(bulk.execute(bulk)) 
     tasks.append(task) 
    logging.info('%s processed', cnt) 
    await asyncio.gather(*tasks) 


logging.basicConfig(level='INFO')  
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection'] 
start_time = time.time() 
loop = asyncio.get_event_loop() 
try: 
    loop.run_until_complete(main()) 
finally: 
    execution_time = time.time() - start_time 
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time)) 
+0

¿Por qué el módulo 1000? Pensé que las operaciones masivas de Mongo 'automáticamente' se ejecutan en 2000 operaciones en un intento ... – duhaime

+1

@duhaime Buena pregunta. Puedo consultar [documentación] (https://docs.mongodb.com/manual/reference/limits/#Bulk-Operation-Size), pero se ve como [límite de 16 MB para BSON] (http://stackoverflow.com/ a/24238349/4249707) será un valor más preciso. Desafortunadamente, no tengo idea de cómo medir el tamaño de las solicitudes –

+0

@duhaime BTW, de acuerdo con el motor más reciente [documentos] (http://motor.readthedocs.io/en/stable/examples/bulk.html#bulk-insert) podemos no te preocupes por eso en absoluto –

0

si tiene muchos datos y desea utilizar "_id" para juzgar si existen datos,

puede probar ...

import pymongo 
from pymongo import UpdateOne 
client = pymongo.MongoClient('localhost', 27017) 
db=client['sampleDB'] 

collectionInfo = db.sample 

#sample data 
datas=[ 
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33}, 
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33}, 
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33}, 
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33}, 
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33}, 
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33} 
] 

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas] 

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)] 

collectionInfo.bulk_write(operations) 

Mi Inglés es muy pobre, si no puede entender lo que digo, lo siento