2011-11-28 25 views
21

Estoy tratando de usar Avro para los mensajes que se leen/escriben en Kafka. ¿Alguien tiene un ejemplo de usar el codificador binario Avro para codificar/decodificar datos que se colocarán en una cola de mensajes?Cómo codificar/decodificar mensajes Kafka usando el codificador binario Avro?

Necesito la parte de Avro más que la parte de Kafka. O, tal vez debería ver una solución diferente? Básicamente, estoy tratando de encontrar una solución más eficiente para JSON con respecto al espacio. Avro fue mencionado ya que puede ser más compacto que JSON.

Respuesta

11

Finalmente me acordé de preguntar en la lista de correo de Kafka y obtuve lo siguiente como respuesta, que funcionó a la perfección.

Sí, puede enviar mensajes como matrices de bytes. Si nos fijamos en el constructor de la clase de mensaje, verá -

def esto (bytes: Matriz [Byte])

Ahora, mirando el envío Productor() de la API -

def send (producerData: ProducerData [K, V] *)

Puede configurar V para que sea del tipo Message y K a lo que quiere que sea su clave. Si no le interesa la creación de particiones con una clave, configúrelo también en el tipo de mensaje .

Gracias, Neha

2

En vez de Avro, usted podría también considerar simplemente la compresión de datos; ya sea con gzip (buena compresión, mayor CPU) o LZF o Snappy (mucho más rápido, compresión más lenta).

O, alternativamente, también hay Smile binary JSON, apoyado en Java por Jackson (con this extension): se trata de formato compacto binario, y mucho más fácil de usar que Avro:

ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
byte[] serialized = mapper.writeValueAsBytes(pojo); 
// or back 
SomeType pojo = mapper.readValue(serialized, SomeType.class); 

básicamente mismo código que con JSON, excepto para pasar fábrica de formato diferente. Desde la perspectiva del tamaño de datos, si Smile o Avro es más compacto depende de los detalles del caso de uso; pero ambos son más compactos que JSON.

El beneficio es que esto funciona rápido con JSON y Smile, con el mismo código, utilizando solo POJO. Comparado con Avro que requiere generación de código o muchos códigos manuales para empaquetar y descomprimir GenericRecord s.

7

Si desea obtener una matriz de bytes de un mensaje de Avro (la parte kafka ya está contestada), utilice el codificador binario:

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try { 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
    } finally { 
     os.close(); 
    } 
+0

¿Puede enviar este byteData al KafkaBroker y leerlo desde el consumidor de la consola? ¿Cuál debería ser el serializador de claves de Producer? – user2441441

+0

Como se menciona en la respuesta, la parte kafka está documentada en otras respuestas - http://stackoverflow.com/a/8348264/5266 y http://stackoverflow.com/a/32341917/5266 –

12

Este es un ejemplo básico. No lo he intentado con múltiples particiones/temas.

// Ejemplo de código del productor

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.*; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.avro.specific.SpecificDatumWriter; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import java.util.Properties; 


public class ProducerTest { 

    void producer(Schema schema) throws IOException { 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "0:9092"); 
     props.put("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, byte[]> producer = new Producer<String, byte[]>(config); 
     GenericRecord payload1 = new GenericData.Record(schema); 
     //Step2 : Put data in that genericrecord object 
     payload1.put("desc", "'testdata'"); 
     //payload1.put("name", "अasa"); 
     payload1.put("name", "dbevent1"); 
     payload1.put("id", 111); 
     System.out.println("Original Message : "+ payload1); 
     //Step3 : Serialize the object to a bytearray 
     DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload1, encoder); 
     encoder.flush(); 
     out.close(); 

     byte[] serializedBytes = out.toByteArray(); 
     System.out.println("Sending message in bytes : " + serializedBytes); 
     //String serializedHex = Hex.encodeHexString(serializedBytes); 
     //System.out.println("Serialized Hex String : " + serializedHex); 
     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes); 
     producer.send(message); 
     producer.close(); 

    } 


    public static void main(String[] args) throws IOException, DecoderException { 
     ProducerTest test = new ProducerTest(); 
     Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
     test.producer(schema); 
    } 
} 

// Ejemplo de código de consumo

Parte 1: código del grupo del consumidor: como se puede tener más de varios consumidores para múltiples particiones/temas.

import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* Created by on 9/1/15. 
*/ 
public class ConsumerGroupExample { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){ 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId)); 
     this.topic = a_topic; 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){ 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 

     return new ConsumerConfig(props); 
    } 

    public void shutdown(){ 
     if (consumer!=null) consumer.shutdown(); 
     if (executor!=null) executor.shutdown(); 
     System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     try{ 
      if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){ 

      } 
     }catch(InterruptedException e){ 
      System.out.println("Interrupted"); 
     } 

    } 


    public void run(int a_numThreads){ 
     //Make a map of topic as key and no. of threads for that topic 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     //Create message streams for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

     //initialize thread pool 
     executor = Executors.newFixedThreadPool(a_numThreads); 
     //start consuming from thread 
     int threadNumber = 0; 
     for (final KafkaStream stream : streams) { 
      executor.submit(new ConsumerTest(stream, threadNumber)); 
      threadNumber++; 
     } 
    } 
    public static void main(String[] args) { 
     String zooKeeper = args[0]; 
     String groupId = args[1]; 
     String topic = args[2]; 
     int threads = Integer.parseInt(args[3]); 

     ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
     example.run(threads); 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     example.shutdown(); 
    } 


} 

Parte 2: Consumidor individual que realmente consume los mensajes.

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.message.MessageAndMetadata; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.generic.IndexedRecord; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.Decoder; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.commons.codec.binary.Hex; 

import java.io.File; 
import java.io.IOException; 

public class ConsumerTest implements Runnable{ 

    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run(){ 
     ConsumerIterator<byte[], byte[]>it = m_stream.iterator(); 
     while(it.hasNext()) 
     { 
      try { 
       //System.out.println("Encoded Message received : " + message_received); 
       //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray()); 
       //System.out.println("Deserializied Byte array : " + input); 
       byte[] received_message = it.next().message(); 
       System.out.println(received_message); 
       Schema schema = null; 
       schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
       DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); 
       GenericRecord payload2 = null; 
       payload2 = reader.read(null, decoder); 
       System.out.println("Message received : " + payload2); 
      }catch (Exception e) { 
       e.printStackTrace(); 
       System.out.println(e); 
      } 
     } 

    } 


} 

prueba AVRO esquema:

{ 
    "namespace": "xyz.test", 
    "type": "record", 
    "name": "payload", 
    "fields":[ 
     { 
      "name": "name", "type": "string" 
     }, 
     { 
      "name": "id", "type": ["int", "null"] 
     }, 
     { 
      "name": "desc", "type": ["string", "null"] 
     } 
    ] 
} 

cosas importantes a tener en cuenta son:

  1. usted necesitará el kafka estándar y frascos Avro para ejecutar el código de la caja.

  2. Es muy importante props.put ("serializer.class", "kafka.serializer.DefaultEncoder"); Don t use stringEncoder as that won t funciona si está enviando una matriz de bytes como mensaje.

  3. Puede convertir el byte [] en una cadena hexadecimal y enviar eso y en el consumidor reconvierta la cadena hexadecimal a byte [] y luego al mensaje original.

  4. Ejecute el cuidador de zoo y el intermediario como se menciona aquí: - http://kafka.apache.org/documentation.html#quickstart y cree un tema llamado "page_views" o lo que desee.

  5. Ejecute ProducerTest.java y luego ConsumerGroupExample.java y vea los datos de Avro producidos y consumidos.

+0

¡Gracias por la ayuda! ! Intenté esto, pero en el código del consumidor mi función it.hasNext() devuelve false para que el control nunca entre al ciclo while. ¿Hay alguna idea de qué puedo estar haciendo mal? –

3

Respuesta actualizada.

Kafka tiene un serializador Avro/deserializer con Maven (SBT formateado) coordina:

"io.confluent" % "kafka-avro-serializer" % "3.0.0" 

pasa una instancia de KafkaAvroSerializer al constructor KafkaProducer.

Luego puede crear instancias de Avro GenericRecord y usarlas como valores dentro de las instancias de Kafka ProducerRecord que puede enviar con KafkaProducer.

En el lado del consumidor de Kafka, utiliza KafkaAvroDeserializer y KafkaConsumer.

+0

¿Sería capaz de proporcionar un ejemplo breve pero completo? –

+1

Esto solo funciona con el repositorio Maven de Confluent agregado, ya que no publican los artefactos en maven central: http://packages.confluent.io/maven –

Cuestiones relacionadas