2011-07-20 24 views
8

Estoy escribiendo un script para hacer una copia de algunos datos entre dos máquinas en la misma red usando psycopg2. Estoy reemplazando una vieja fea fiesta, que hace la copia contubería postgres COPIA en python con psycopg2

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN" 

Esto parece tanto a la forma más simple y most efficient hacer la copia. Es fácil de replicar en pitón con un Stringio o un archivo temporal, así:

buf = StringIO() 

from_curs = from_conn.cursor() 
to_curs  = to_conn.cursor() 

from_curs.copy_expert("COPY table TO STDOUT", buf) 
buf.seek(0, os.SEEK_SET) 
to_curs.copy_expert("COPY table FROM STDIN", buf) 

... pero eso implica guardar todos los datos en el disco/en la memoria.

¿Alguien ha encontrado una forma de imitar el comportamiento de una tubería Unix en una copia como esta? Parece que no puedo encontrar un objeto Unix-pipe que no implique POpen. Tal vez la mejor solución es simplemente usar POpen y subprocesar, después de todo.

+0

curioso es trabajado por debajo de la solución? – agf

Respuesta

0

Se puede usar un doble cola que ha subclases para apoyar la lectura y la escritura:

from collections import deque 
from Exceptions import IndexError 

class DequeBuffer(deque): 
    def write(self, data): 
     self.append(data) 
    def read(self): 
     try: 
      return self.popleft() 
     except IndexError: 
      return '' 

buf = DequeBuffer() 

Si el lector es mucho más rápido que el escritor, y la tabla es grande, la deque seguirá recibiendo grandes, pero será más pequeño que almacenar todo.

Además, no estoy seguro de return '' cuando el deque está vacío es seguro, en lugar de volver a intentarlo hasta que no esté vacío, pero supongo que sí. Déjame saber si funciona.

Recuerde del buf cuando esté seguro de que la copia está lista, especialmente si la secuencia de comandos no acaba de salir en ese momento.

12

Tendrá que poner una de sus llamadas en una secuencia separada. Me he dado cuenta de que puede utilizar os.pipe(), lo que hace el resto bastante sencillo:

#!/usr/bin/python 
import psycopg2 
import os 
import threading 

fromdb = psycopg2.connect("dbname=from_db") 
todb = psycopg2.connect("dbname=to_db") 

r_fd, w_fd = os.pipe() 

def copy_from(): 
    cur = todb.cursor() 
    cur.copy_from(os.fdopen(r_fd), 'table') 
    cur.close() 
    todb.commit() 

to_thread = threading.Thread(target=copy_from) 
to_thread.start() 

cur = fromdb.cursor() 
write_f = os.fdopen(w_fd, 'w') 
cur.copy_to(write_f, 'table') 
write_f.close() # or deadlock... 

to_thread.join() 
+0

¡Esta es una gran solución! Tengo curiosidad por saber por qué fue necesario introducir un objeto Thread? – Demitri

+3

@Demitri, 'copy_from()' y 'copy_to()' son comandos de bloqueo; no regresan hasta que la operación haya terminado. Si hiciéramos la primera llamada en el hilo principal, simplemente esperaríamos los datos en la tubería y nunca recuperaríamos el control para hacer la otra llamada. –

+0

Ah, ya veo. Todavía se bloqueará en el nuevo hilo, pero permitirá que el hilo principal alimente el tubo cuando lo desee. Gracias. – Demitri

Cuestiones relacionadas