2012-07-15 17 views
6

Empecé a jugar con Kafka. Establecí una configuración de cuidador de zoológico y logré enviar y consumir mensajes de cadena. Ahora estoy tratando de pasar un objeto (en java), pero por alguna razón, al analizar el mensaje en el consumidor tengo problemas con el encabezado. Intenté varias opciones de serialización (usando Decoder/Encoder), y todas devuelven el mismo problema de encabezado.Kafka Serialización de un objeto

Aquí está mi código El productor:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); 
     ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); 
     try { 
      producer.send(data); 
     } finally { 
      producer.close(); 
     } 

Y el consumidor:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("zk.connectiontimeout.ms", "1000000"); 
     props.put("groupid", "test_group"); 

     // Create the connection to the cluster 
     ConsumerConfig consumerConfig = new ConsumerConfig(props); 
     ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 

     // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume 
     Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = 
       consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); 
     List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); 

     // create list of 4 threads to consume from each of the partitions 
     ExecutorService executor = Executors.newFixedThreadPool(4); 

     // consume the messages in the threads 
     for (final KafkaMessageStream<EventDetails> stream: streams) { 
      executor.submit(new Runnable() { 
       public void run() { 
        for(EventDetails event: stream) { 
         System.err.println("********** Got message" + event.toString());   
        } 
       } 
      }); 
     } 

y mi Serializador:

public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { 
    public Message toMessage(EventDetails eventDetails) { 
     try { 
      ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
      byte[] serialized = mapper.writeValueAsBytes(eventDetails); 
      return new Message(serialized); 
} catch (IOException e) { 
      e.printStackTrace(); 
      return null; // TODO 
     } 
} 
    public EventDetails toEvent(Message message) { 
     EventDetails event = new EventDetails(); 

     ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
     try { 
      //TODO handle error 
      return mapper.readValue(message.payload().array(), EventDetails.class); 
     } catch (IOException e) { 
      e.printStackTrace(); 
      return null; 
     } 

    } 
} 

Y este es el error que consigo:

org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse 
at [Source: N/A; line: -1, column: -1] 

Cuando trabajé con MessagePack y con escritura simple en ObjectOutputStream recibí un problema de encabezado similar. También traté de agregar la carga útil CRC32 al mensaje, pero eso tampoco ayudó.

¿Qué estoy haciendo mal aquí?

Respuesta

1

Bytebuffer El método .array() no es muy confiable. Depende de la implementación particular. Es posible que desee probar

ByteBuffer bb = message.payload() 

byte[] b = new byte[bb.remaining()] 
bb.get(b, 0, b.length); 
return mapper.readValue(b, EventDetails.class) 
+0

¡Gracias, esto resolvió un problema muy similar que estaba teniendo! – Jarmex

3

Hm, no se han encontrado con el mismo problema de cabecera que se está encontrando, pero mi proyecto no estaba compilando correctamente cuando no le proporcionamos un constructor VerifiableProperties en mi codificador/decodificador . Sin embargo, parece extraño que el constructor faltante corrompería la deserialización de Jackson.

Quizás intente dividir su codificador y decodificador e incluya el constructor VerifiableProperties en ambos; no debería necesitar implementar Decoder[T] para la serialización. Pude implementar con éxito json de/serialization usando ObjectMapper siguiendo el formato en this post.

Buena suerte!

Cuestiones relacionadas