2012-05-10 25 views
6

Quiero almacenar matrices Numpy en una base de datos PostgreSQL en forma binaria (bytea). Puedo hacer que esto funcione bien en la prueba n. ° 1 (ver a continuación), pero no quiero tener que manipular las matrices de datos antes de las inserciones y después de las selecciones todas las veces; quiero usar los adaptadores y conversores de psycopg2.Usando un convertidor psycopg2 para recuperar datos bytea de PostgreSQL

Esto es lo que tengo en este momento:

import numpy as np 
import psycopg2, psycopg2.extras 


def my_adapter(spectrum): 
    return psycopg2.Binary(spectrum) 

def my_converter(my_buffer, cursor): 
    return np.frombuffer(my_buffer) 


class MyBinaryTest(): 

    # Connection info 
    user = 'postgres' 
    password = 'XXXXXXXXXX' 
    host = 'localhost' 
    database = 'test_binary' 


    def __init__(self): 
     pass 

    def set_up(self): 

     # Set up 
     connection = psycopg2.connect(host=self.host, user=self.user, password=self.password) 

     connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 

     cursor = connection.cursor() 
     try: # Clear out any old test database 
      cursor.execute('drop database %s' % (self.database,)) 
     except: 
      pass 

     cursor.execute('create database %s' % (self.database,)) 
     cursor.close() 
     connection.close() 

     # Direct connectly to the database and set up our table    
     self.connection = psycopg2.connect(host=self.host, user=self.user, password=self.password, database=self.database) 
     self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor) 

     self.cursor.execute('''CREATE TABLE spectrum (
      "sid" integer not null primary key, 
      "data" bytea not null 
      ); 

      CREATE SEQUENCE spectrum_id; 
      ALTER TABLE spectrum 
       ALTER COLUMN sid 
        SET DEFAULT NEXTVAL('spectrum_id'); 
      ''') 
     self.connection.commit() 



    def perform_test_one(self): 

     # Lets do a test 

     shape = (2, 100) 
     data = np.random.random(shape) 

     # Binary up the data 
     send_data = psycopg2.Binary(data) 

     self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [send_data]) 
     self.connection.commit() 

     # Retrieve the data we just inserted 
     query = self.cursor.execute('select * from spectrum') 
     result = self.cursor.fetchall() 

     print "Type of data retrieved:", type(result[0]['data']) 

     # Convert it back to a numpy array of the same shape 
     retrieved_data = np.frombuffer(result[0]['data']).reshape(*shape) 

     # Ensure there was no problem 
     assert np.all(retrieved_data == data) 
     print "Everything went swimmingly in test one!" 

     return True 

    def perform_test_two(self): 

     if not self.use_adapters: return False 

     # Lets do a test 

     shape = (2, 100) 
     data = np.random.random(shape) 

     # No changes made to the data, as the adapter should take care of it (and it does) 

     self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [data]) 
     self.connection.commit() 

     # Retrieve the data we just inserted 
     query = self.cursor.execute('select * from spectrum') 
     result = self.cursor.fetchall() 

     # No need to change the type of data, as the converter should take care of it 
     # (But, we never make it here) 

     retrieved_data = result[0]['data'] 

     # Ensure there was no problem 
     assert np.all(retrieved_data == data.flatten()) 
     print "Everything went swimmingly in test two!" 

     return True 


    def setup_adapters_and_converters(self): 

     # Set up test adapters 
     psycopg2.extensions.register_adapter(np.ndarray, my_adapter) 

     # Register our converter 
     self.cursor.execute("select null::bytea;") 
     my_oid = self.cursor.description[0][1] 

     obj = psycopg2.extensions.new_type((my_oid,), "numpy_array", my_converter) 
     psycopg2.extensions.register_type(obj, self.connection) 

     self.connection.commit() 

     self.use_adapters = True 


    def tear_down(self): 

     # Tear down 

     self.cursor.close() 
     self.connection.close() 

     connection = psycopg2.connect(host=self.host, user=self.user, password=self.password) 

     connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 

     cursor = connection.cursor() 
     cursor.execute('drop database %s' % (self.database,)) 
     cursor.close() 
     connection.close() 


test = MyBinaryTest() 
test.set_up() 
test.perform_test_one() 
test.setup_adapters_and_converters() 
test.perform_test_two() 
test.tear_down() 

Ahora, la prueba # 1 funciona bien. Cuando tomo el código que he usado en la prueba 1 y configuro un adaptador y convertidor psycopg2, no funciona (prueba 2). Esto se debe a que los datos que se envían al convertidor ya no son un búfer; es la representación de cadena de BygreSQL de PosgreSQL. La salida es la siguiente:

In [1]: run -i test_binary.py 
Type of data retrieved: type 'buffer'> 
Everything went swimmingly in test one! 
ERROR: An unexpected error occurred while tokenizing input 
The following traceback may be corrupted or invalid 
The error message is: ('EOF in multi-line statement', (273, 0)) 

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 

/Users/andycasey/thesis/scope/scope/test_binary.py in <module>() 
    155 test.perform_test_one() 
    156 test.setup_adapters_and_converters() 
--> 157 test.perform_test_two() 
    158 test.tear_down() 
    159 

/Users/andycasey/thesis/scope/scope/test_binary.py in perform_test_two(self) 
    101   # Retrieve the data we just inserted 

    102   query = self.cursor.execute('select * from spectrum') 
--> 103   result = self.cursor.fetchall() 
    104 
    105   # No need to change the type of data, as the converter should take care of it 


/Library/Python/2.6/site-packages/psycopg2/extras.pyc in fetchall(self) 
    81  def fetchall(self): 
    82   if self._prefetch: 
---> 83    res = _cursor.fetchall(self) 
    84   if self._query_executed: 
    85    self._build_index() 

/Users/andycasey/thesis/scope/scope/test_binary.py in my_converter(my_buffer, cursor) 
     7 
     8 def my_converter(my_buffer, cursor): 
----> 9  return np.frombuffer(my_buffer) 
    10 
    11 

ValueError: buffer size must be a multiple of element size 
WARNING: Failure executing file: <test_binary.py> 

In [2]: %debug 
> /Users/andycasey/thesis/scope/scope/test_binary.py(9)my_converter() 
     8 def my_converter(my_buffer, cursor): 
----> 9  return np.frombuffer(my_buffer) 
    10 

ipdb> my_buffer 
'\\x40e67378b9b8ae3f78b15ebecf20ef3f4092f00289dc803f20a843f40b9ddd3f64b6ec99bf62e83f8cea6eb60758d43f2ba47d8e6d5be73f4e88f267bbb2d83ffacc8aad2220d43fc6006b9c7eb7d33ff440cccc638de33f70e0b4b906a1e13fe0eca2af2f87c83f98d31f41e081ee3f1e6f5b8a52fdea3f80fcbd0ec3a0a93f95316c9e462eed3f83fe6d8d2463ea3fb44849fa8404d33f701be5924049df3f6ef3ca0c50f6d63f0c7b7d800cfdda3fc000e89b890c983fb32cf3e4ba1dea3f87f17f7efc06e33f2e194b361190ed3f60e955f0456d933ff24dd5aabc7eeb3f7802405af74ddc3f9ce9c3852db8e03fa0c936267c19d33f3406c35637f9ec3f288d23502e70ee3f08fe67e7ed8ec53f00f5cde29763dc3f26bcb4d362c4e23fa9e01fac6cd8e33fbec912f5ff7ae13f7fbd61e2e585ed3fa0070671e970e83f68ef1f6e0b90da3fce9ce834bfa6d43fa02b825d144e903f42912641e5aedd3f645a299de883db3fd8b5126bb8f6c23f3c5d4ae40ecccd3f5ae503835d00e13fcc784bdb7ea9c43f880ebfb30719be3f1dffcb042f58e23f44cc727ab3dfc53f1bbe477eb861e43f3c4f55f6aea5e53fdc80f6fa91d6e33f12b580ef03acd03f1cb78f8dccaac13f9ebdbd206453d43f32ffc626fe4ddc3f625ff4e2b317d33f44822e2f0d52ca3f38fea7c36ba6cb3ff0290b4707cedc3fd456190f786bcd3f7ed46219b47eda3f66fbdef755c3df3f40ccd47f88978c3f382897872cf5b73f5d24a66af5d7e13f2dd179d56ea3ee3fc4bb5b0962bcd63f20024c1c55ddb63f68a02e5f73fbd13f21eeb68b333de63f1a19dfe1b713e53f7556fedbb698e53f44eb6e9228accf3fe61a509c1d4ae43fe0fb0624828fa83f1822e55e76cdd23f801708ab685dd93f06076be2e92bed3f5ac2ff90247fed3fd42902b6b974d13f9df97b70385ce83fdabc4af1e81fe83f250611249338e73fc0251f9c9739e93f5821b6024621d63f7a7e1fc15605e73fab085fa8bb67e83fb4eb1d087ef5dd3fd1b450d406cbe13f0078ed1c422d3e3f44ed12d19085e83f117d628438daea3f15c776903519e23f747f248fa2e0c83ffcd052e9c4edc93f177a255a0a91e93fbe3b9b894d8edf3fea9fb6dd8be4e23fdc879e88e094e83f18bd28327ae3c03fc1bfd06d0379ec3fe8d7ee7e066ee03f750c4e0f4802e33fca3e4d0e34d3da3fe0578becde30c43f6044d9ad900ed23f08a2562899a3d13f5a83cf6694f3e33f001c61debd5f513fa009953fde2c9a3f29d53b02ca65e53fda066b4421a8ea3f58f074484a08cc3fe239b4b7eb57e03f1f904fe586bde43f9ce6edd599d1d13f43878f622d7ee23fd3ebab4e7904e93f7c3437ad0e16d23fac5e5e9e08a9c83f2b7b2d56db34e73f74f8cd68effeed3f4c279a9d4210c53ffafad9b31886d33f4c3eb4acc9b0dc3f6ed2f82f486edc3fc349273cbe1fec3fe2f70e89b061d83facaa25cb8fdbcd3fb0659c127fb7e83f00a224076b6da43f9ab1eb331dfade3fc86e03757e3bec3f3d00c8545ccce93f90fac6a4cc21b93f08f57560a68bc63fd8cccbabcd13b03fc679c7f9ece6df3f4a8c78aa1a1aed3ffecac18174dbe43fdfe102cffb48e93f0078f7fa27cc463fb40acdaea46ee63f54f754df4daadf3f2a9e063d0ab3da3f82a21b50d3c6d33f1182e48aafb5ed3fb67f3de3b109d63f494258c18422e13f8a5542fc1491e63f43247cbeabece13feb9355572f68eb3f3cf415eee8f1d53f887df6aab75bb43f0042cd907780523ff5e724cad881e03fdb9de04e99ffe43fd6594feb9b75ec3f6d4e6fcf7690e13fabe634f015dee13f584563d26021c93f6f1916ee57c8e13fd8906bad6fa7cd3ff8fad5b03b02eb3f1b3b87c15f16e53f4014ec100f79c73f1aee1302d960d83f45be6b695ed9e13ffc86d1d311dbdb3f089e89e6389fb93f24d742e400cbd63fa048c53d8fbf9c3f6eb1db094d81ed3f8bbf0cba79fde63f70e8f3d63c43c33ff1c5e6fed947e43f64f3a21f062ee03f0d12c4282794e03fa0a3be998572ba3f16510b776d7aeb3fb8c7ca308d2acd3f6f37eb1eb330ef3f1ba1bdb6577fe73f78d805294a05b43f0ed0bea2f180db3f5a4cce890b57ea3f2472556ba6f1e43f1a79fcc20701e53fe2ae8a1ea5f7d73fe0bd1efc12caec3ff94b1e02a75bed3f78e098184e3fea3f46ff0b2344dedb3f1cdc0f7b72efdb3f6ceb0b772b37e43f47e49b2a7088ea3f' 

¿Alguien sabe cómo puedo ya sea (a) de-serializar la representación de cadena volviendo a mí en my_converter así que devolver una matriz Numpy cada vez, o (b) PostgreSQL vigor/psycopg2 para enviar la representación de búfer al convertidor (que puedo usar) en lugar de la representación de cadena?

Gracias!

estoy en OS X 10.6.8 con Python 2.6.1 (R261: 67515), PostgreSQL 9.0.3 y 2.4 psycopg2 (dt ext PQ3 dec)

Respuesta

8

El formato que se ve en el depurador es fácil para analizar: es el formato binario hexagonal de PostgreSQL (http://www.postgresql.org/docs/9.1/static/datatype-binary.html). psycopg puede analizar ese formato y devolver un buffer que contiene los datos; puedes usar ese buffer para obtener una matriz. En lugar de escribir un typecaster desde cero, escriba uno invocando el func original y posprocesando su resultado. Lo siento, pero ahora no puedo recordar su nombre y estoy escribiendo desde un teléfono móvil: puede obtener más ayuda de la lista de correo.


Editar: solución completa.

El valor predeterminado bytea typecaster (que es el objeto que se puede analizar los postgres binario representación y devolver un objeto tampón fuera de él) es psycopg2.BINARY. Podemos utilizarlo para crear un typecaster convertir a matriz en lugar:

In [1]: import psycopg2 

In [2]: import numpy as np 

In [3]: a = np.eye(3) 

In [4]: a 
Out[4]: 
array([[ 1., 0., 0.], 
     [ 0., 1., 0.], 
     [ 0., 0., 1.]]) 

In [5]: cnn = psycopg2.connect('') 


# The adapter: converts from python to postgres 
# note: this only works on numpy version whose arrays 
# support the buffer protocol, 
# e.g. it works on 1.5.1 but not on 1.0.4 on my tests. 

In [12]: def adapt_array(a): 
    ....:  return psycopg2.Binary(a) 
    ....: 

In [13]: psycopg2.extensions.register_adapter(np.ndarray, adapt_array) 


# The typecaster: from postgres to python 

In [21]: def typecast_array(data, cur): 
    ....:  if data is None: return None 
    ....:  buf = psycopg2.BINARY(data, cur) 
    ....:  return np.frombuffer(buf) 
    ....: 

In [24]: ARRAY = psycopg2.extensions.new_type(psycopg2.BINARY.values, 
'ARRAY', typecast_array) 

In [25]: psycopg2.extensions.register_type(ARRAY) 


# Now it works "as expected" 

In [26]: cur = cnn.cursor() 

In [27]: cur.execute("select %s", (a,)) 

In [28]: cur.fetchone()[0] 
Out[28]: array([ 1., 0., 0., 0., 1., 0., 0., 0., 1.]) 

Como saben, np.frombuffer (a) pierde la configuración de red, por lo que tendrá que encontrar una manera de preservarla.

+1

Gracias @piro - I ha he intentado analizar este formato hexadecimal (y el formato de escape PostgreSQL) usando cosas como 'np.frombuffer (buffer (my_buffer [2:]. decode ('hex'), 0, 1600))', pero no he tenido éxito en recuperando la matriz original. Si puede recordar las funciones para analizar este formato, estaría muy agradecido. He preguntado en la lista de correo y están interesados ​​en ver cómo se resuelve este problema. ¡Gracias! –

+0

* golpe * ... ¿Alguien? –

1

En el caso de matrices numpy se puede evitar la estrategia de buffer con todas sus desventajas, como la pérdida de forma y el tipo de datos. Siguiendo un stackoverflow question sobre el almacenamiento de una matriz numpy en sqlite3 uno puede adaptar fácilmente el enfoque para postgres.

import os 
import psycopg2 as psql 
import numpy as np 

# converts from python to postgres 
def _adapt_array(text): 
    out = io.BytesIO() 
    np.save(out, text) 
    out.seek(0) 
    return psql.Binary(out.read()) 

# converts from postgres to python 
def _typecast_array(value, cur): 
    if value is None: 
     return None 

    data = psql.BINARY(value, cur) 
    bdata = io.BytesIO(data) 
    bdata.seek(0) 
    return np.load(bdata) 

con = psql.connect('') 

psql.extensions.register_adapter(np.ndarray, _adapt_array) 
t_array = sql.extensions.new_type(sql.BINARY.values, "numpy", _typecast_array) 
psql.extensions.register_type(t_array) 

cur = con.cursor() 

Ahora uno puede crear y rellenar una tabla (con a definido como en el post anterior)

cur.execute("create table test (column BYTEA)") 
cur.execute("insert into test values(%s)", (a,)) 

y restaurar el objeto numpy

cur.execute("select * from test") 
cur.fetchone()[0] 

Resultado:

array([[ 1., 0., 0.], 
     [ 0., 1., 0.], 
     [ 0., 0., 1.]]) 
Cuestiones relacionadas