2011-09-24 12 views
16

Esto es algo así como un tiro en la oscuridad en caso de que alguien conocedor de la implementación Java de Apache Avro esté leyendo esto.En Java, ¿cómo puedo crear un equivalente de un archivo contenedor Apache Avro sin tener que utilizar un archivo como medio?

Mi objetivo de alto nivel es tener alguna forma de transmitir algunas series de datos avro a través de la red (digamos HTTP por ejemplo, pero el protocolo en particular no es tan importante para este propósito). En mi contexto, tengo una HttpServletResponse. Necesito escribir estos datos de alguna manera.

inicialmente intentó escribir los datos como lo que equivalía a una versión virtual de un archivo contenedor Avro (suponer que la "respuesta" es de tipo HttpServletResponse):

response.setContentType("application/octet-stream"); 
response.setHeader("Content-transfer-encoding", "binary"); 
ServletOutputStream outStream = response.getOutputStream(); 
BufferedOutputStream bos = new BufferedOutputStream(outStream); 

Schema someSchema = Schema.parse(".....some valid avro schema...."); 
GenericRecord someRecord = new GenericData.Record(someSchema); 
someRecord.put("somefield", someData); 
... 

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema); 
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
fileWriter.create(someSchema, bos); 
fileWriter.append(someRecord); 
fileWriter.close(); 
bos.flush(); 

Esto era todo lo fino y elegante, con la excepción que resulta que en realidad no se Avro proporcionar una manera de leer un archivo contenedor aparte de un archivo real: el DataFileReader solamente tiene dos constructores:

public DataFileReader(File file, DatumReader<D> reader); 

y

public DataFileReader(SeekableInput sin, DatumReader<D> reader); 

donde SeekableInput es una forma personalizada específica de avro cuya creación también termina leyendo de un archivo. Ahora bien, dado que, a menos que haya alguna forma de forzar de algún modo un InputStream en un archivo (http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a sugiere que no existe, y he intentado buscar en la documentación de Java también), este enfoque no funcionará si el lector en el otro extremo de OutputStream recibe ese archivo contenedor avro (No estoy seguro de por qué permitieron que uno generara archivos contenedores avro binarios en un OutputStream arbitrario sin proporcionar una manera de leerlos desde el InputStream correspondiente en el otro extremo, pero eso está al lado del punto). Parece que la implementación del lector de archivos contenedores requiere la funcionalidad "buscable" que proporciona un archivo concreto.

De acuerdo, así que no parece que ese enfoque hará lo que quiero. ¿Qué tal crear una respuesta JSON que imite el archivo contenedor avro?

public static Schema WRAPPER_SCHEMA = Schema.parse(
    "{\"type\": \"record\", " + 
    "\"name\": \"AvroContainer\", " + 
    "\"doc\": \"a JSON avro container file\", " + 
    "\"namespace\": \"org.bar.foo\", " + 
    "\"fields\": [" + 
    "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " + 
    "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}" 
); 

No estoy seguro si esto es la mejor manera de abordar esto, dadas las limitaciones anteriores, pero parece que esto podría hacer el truco. Pondré el esquema (de "Schema someSchema" desde arriba, por ejemplo) como una Cadena dentro del campo "esquema", y luego coloco en la forma avro-binaria-serializada de un registro que corresponde a ese esquema (es decir, "GenericRecord" someRecord ") dentro del campo" datos ".

En realidad, quería saber acerca de un detalle específico de lo que se describe a continuación, pero pensé que valdría la pena dar un contexto más grande también, por lo que si hay un mejor enfoque de alto nivel que podría estar tomando (Este enfoque funciona, pero simplemente no se siente óptimo) por favor házmelo saber.

Mi pregunta es, suponiendo que vaya con este enfoque basado en JSON, ¿cómo escribo la representación binaria avro de mi registro en el campo "datos" del esquema de AvroContainer? Por ejemplo, llegué hasta aquí:

ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema); 
Encoder e = new BinaryEncoder(baos); 
datumWriter.write(resultsRecord, e); 
e.flush(); 

GenericRecord someRecord = new GenericData.Record(someSchema); 
someRecord.put("schema", someSchema.toString()); 
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray())); 
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA); 
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8); 
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator); 
datumWriter.write(someRecord, e); 
e.flush(); 

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse 
response.setContentType("text/plain"); 
response.setCharacterEncoding("UTF-8"); 
printWriter.print(baos.toString("UTF-8")); 

inicialmente traté omitiendo la cláusula ByteBuffer.wrap, pero entonces la línea

datumWriter.write(someRecord, e); 

inició una excepción que no podía emitir una matriz de bytes en ByteBuffer.Bastante, parece que cuando se llama a la clase Encoder (de la cual JsonEncoder es una subclase) para escribir un objeto avro Bytes, se requiere un ByteBuffer como argumento. Por lo tanto, he intentado encapsular el byte [] con java.nio.ByteBuffer.wrap, pero cuando los datos se imprimen, se imprimió como una serie lineal de bytes, sin ser pasado a través de la representación hexadecimal Avro:

"data": {"bytes": ".....some gibberish other than the expected format...} 

Eso no parece correcto. De acuerdo con la documentación de avro, el objeto de bytes de ejemplo que dan dice que necesito poner un objeto json, cuyo ejemplo parece "\ u00FF", y lo que he puesto allí claramente no es de ese formato. Lo que ahora quiero saber es lo siguiente:

  • ¿Qué es un ejemplo de formato de bytes avro? ¿Se ve algo así como "\ uDEADBEEFDEADBEEF ..."?
  • ¿Cómo forzar mis datos avro binarios (como los genera el BinaryEncoder en una matriz byte []) en un formato que puedo incluir en el objeto GenericRecord y hacer que se imprima correctamente en JSON? Por ejemplo, quiero un Object DATA para el que pueda llamar en algunos registros genéricos "someRecord.put (" data ", DATA);" con mi avro datos serializados adentro?
  • ¿Cómo volvería a leer esos datos en una matriz de bytes en el otro extremo (consumidor), cuando se le dé la representación JSON de texto y desee recrear el registro genérico representado por el formato Jro de AvroContainer?
  • (reiterando la pregunta anterior) ¿Hay alguna manera mejor de que yo pueda estar haciendo todo esto?
+1

org.apache.avro.file.DataFileStream? – Chikei

+3

SeekableInput no es solo una forma personalizada específica de avro cuya creación termina leyendo de un archivo. Hay [SeekableByteArrayInput] (http://avro.apache.org/docs/current/api/java/org/apache/avro/file/SeekableByteArrayInput.html) que se lee desde una matriz de bytes en la memoria. –

+0

Muy buena pregunta, y el requisito de necesitar acceso aleatorio es muy extraño, ya que es imposible satisfacerlo sin una gran cantidad de memoria intermedia. Y, sin embargo, parece innecesario hacerlo también ... No sé por qué se sintió que se necesitaba acceso aleatorio. Muchos otros formatos de datos no agregan tales requisitos para el procesamiento. – StaxMan

Respuesta

1

Como se dijo Knut, si desea utilizar algo que no sea un archivo, puede:

  • uso SeekableByteArrayInput, como dijo Knut, para cualquier cosa que pueda calzador en una matriz de bytes
  • Implemente SeekablInput a su manera; por ejemplo, si lo sacó de una estructura de base de datos extraña.
  • O simplemente use un archivo. Por qué no?

Esas son sus respuestas.

+0

Impresionante, eso es exactamente lo que necesitaba. –

+4

Además, el uso de un archivo aumenta la sobrecarga para la E/S de disco, por lo que si está recibiendo una matriz de bytes a través de la red, primero no quiere ponerla en un archivo y luego leerla (¡viaje de E/S de disco! !!). –

0

La forma en que resolví esto fue enviar los esquemas por separado de los datos. Configuré un saludo de conexión que transmite los esquemas desde el servidor, luego envío datos codificados de un lado a otro. Hay que crear un objeto envoltorio exterior de esta manera:

{'name':'Wrapper','type':'record','fields':[ 
    {'name':'schemaName','type':'string'}, 
    {'name':'records','type':{'type':'array','items':'bytes'}} 
]} 

Donde primera codificar el conjunto de registros, uno por uno, en una matriz de matrices de bytes codificados. Todo en una matriz debe tener el mismo esquema. A continuación, codifica el objeto contenedor con el esquema anterior: configure "schemaName" para que sea el nombre del esquema que utilizó para codificar la matriz.

En el servidor, decodificará primero el objeto envoltorio. Una vez que decodifica el objeto envoltorio, conoce el nombre del esquema, y ​​usted tiene una matriz de objetos que sabe cómo decodificar: ¡use como quiera!

Tenga en cuenta que se puede salir sin utilizar el objeto envoltorio si se utiliza un protocolo como WebSockets y un motor como Socket.IO (por Node.js) Socket.io le da una capa de comunicación basado en canales entre el navegador y el servidor. En ese caso, solo use un esquema específico para cada canal, codifique cada mensaje antes de enviarlo. Aún debe compartir los esquemas cuando se inicia la conexión, pero si está usando WebSockets, esto es fácil de implementar.Y cuando haya terminado, tendrá un número arbitrario de transmisiones bidireccionales fuertemente tipadas entre el cliente y el servidor.

+0

Aunque no es una mala solución, ni siquiera se acerca a abordar la pregunta planteada por el OP. – rbellamy

0

En Java y Scala, hemos intentado utilizar la creación a través del código generado con Scala nitro codegen. Inception es cómo la biblioteca Javascript mtth/avsc resolvió esto problem. Sin embargo, nos encontramos con varios problemas de serialización al utilizar la biblioteca Java donde se inyectaban bytes erróneos en la secuencia de bytes, de forma consistente, y podíamos averiguar no de dónde provenían esos bytes.

Por supuesto, eso significaba construir nuestra propia implementación de Varint con codificación ZigZag. Meh.

Aquí está:

package com.terradatum.query 

import java.io.ByteArrayOutputStream 
import java.nio.ByteBuffer 
import java.security.MessageDigest 
import java.util.UUID 

import akka.actor.ActorSystem 
import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 
import com.nitro.scalaAvro.runtime.GeneratedMessage 
import com.terradatum.diagnostics.AkkaLogging 
import org.apache.avro.Schema 
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} 
import org.apache.avro.io.EncoderFactory 
import org.elasticsearch.search.SearchHit 

import scala.collection.mutable.ArrayBuffer 
import scala.reflect.ClassTag 

/* 
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create 
* the header. That didn't work for us because somehow erroneous bytes were injected into the output. 
* 
* Specifically: 
* 1. 0x08 prepended to the magic 
* 2. 0x0020 between the header and the sync marker 
* 
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such 
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code. 
* 
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and 
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro 
* records. 
* 
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files 
*/ 
object AvroContainerFileHelpers { 

    val magic: ByteBuffer = { 
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte) 
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes) 
    mg.position(0) 
    mg 
    } 

    def makeSyncMarker(): Array[Byte] = { 
    val digester = MessageDigest.getInstance("MD5") 
    digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes) 
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact() 
    marker.position(0) 
    marker.array() 
    } 

    /* 
    * Note that other implementations of avro container files, such as the javascript library 
    * mtth/avsc uses "inception" to encode the header, that is, a datum following a header 
    * schema should produce valid headers. We originally had attempted to do the same but for 
    * an unknown reason two bytes wore being inserted into our header, one at the very beginning 
    * of the header before the MAGIC marker, and one right before the syncmarker of the header. 
    * We were unable to determine why this wasn't working, and so this solution was used instead 
    * where the record/map is encoded per the avro spec manually without the use of "inception." 
    */ 
    def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = { 
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = { 
     val mapBytes = map.flatMap { 
     case (k, vBuff) => 
      val v = vBuff.array() 
      val byteStr = k.getBytes() 
      Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v 
     } 
     Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0) 
    } 

    val schemaBytes = schema.toString.getBytes 
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes) 
    schemaBuffer.position(0) 
    val metadata = Map("avro.schema" -> schemaBuffer) 
    magic.array() ++ avroMap(metadata) ++ syncMarker 
    } 

    def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = { 
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong) 
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong) 

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]() 

    buff.append(countBytes:_*) 
    buff.append(sizeBytes:_*) 
    binaryRecords.foreach { rec => 
     buff.append(rec:_*) 
    } 
    buff.append(syncMarker:_*) 

    buff.toArray 
    } 

    def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = { 
    //block(records.map(encodeRecord(schema, _)), syncMarker) 
    val writer = new GenericDatumWriter[GenericRecord](schema) 
    val out = new ByteArrayOutputStream() 
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null) 
    records.foreach(record => writer.write(record, binaryEncoder)) 
    binaryEncoder.flush() 
    val flattenedRecords = out.toByteArray 
    out.close() 

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]() 

    val countBytes = Varint.encodeLong(records.length.toLong) 
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong) 

    buff.append(countBytes:_*) 
    buff.append(sizeBytes:_*) 
    buff.append(flattenedRecords:_*) 
    buff.append(syncMarker:_*) 

    buff.toArray 
    } 

    def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
     entity: R 
): Array[Byte] = 
    encodeRecord(entity.companion.schema, entity.toMutable) 

    def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = { 
    val writer = new GenericDatumWriter[GenericRecord](schema) 
    val out = new ByteArrayOutputStream() 
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null) 
    writer.write(record, binaryEncoder) 
    binaryEncoder.flush() 
    val bytes = out.toByteArray 
    out.close() 
    bytes 
    } 
} 

/** 
    * Encoding of integers with variable-length encoding. 
    * 
    * The avro specification uses a variable length encoding for integers and longs. 
    * If the most significant bit in a integer or long byte is 0 then it knows that no 
    * more bytes are needed, if the most significant bit is 1 then it knows that at least one 
    * more byte is needed. In signed ints and longs the most significant bit is traditionally 
    * used to represent the sign of the integer or long, but for us it's used to encode whether 
    * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that 
    * negatives are odd numbers and positives are even numbers: 
    * 
    * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on 
    * while 1, 2, 3 would be encoded as 2, 4, 6, and so on. 
    * 
    * More information is available in the avro specification here: 
    * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt 
    *  https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types 
    */ 
object Varint { 

    import scala.collection.mutable 

    def encodeLong(longVal: Long): Array[Byte] = { 
    val buff = new ArrayBuffer[Byte]() 
    Varint.zigZagSignedLong(longVal, buff) 
    buff.toArray[Byte] 
    } 

    def encodeInt(intVal: Int): Array[Byte] = { 
    val buff = new ArrayBuffer[Byte]() 
    Varint.zigZagSignedInt(intVal, buff) 
    buff.toArray[Byte] 
    } 

    def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = { 
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types 
    writeUnsignedLong((x << 1)^(x >> 63), dest) 
    } 

    def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = { 
    var x = v 
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) { 
     dest += ((x & 0x7F) | 0x80).toByte 
     x >>>= 7 
    } 
    dest += (x & 0x7F).toByte 
    } 

    def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = { 
    writeUnsignedInt((x << 1)^(x >> 31), dest) 
    } 

    def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = { 
    var x = v 
    while ((x & 0xFFFFF80) != 0L) { 
     dest += ((x & 0x7F) | 0x80).toByte 
     x >>>= 7 
    } 
    dest += (x & 0x7F).toByte 
    } 
} 
Cuestiones relacionadas