Sha256: ec80bc135aa8fd7ef376a152c5f10aa356cc266104dc89d774e49c53b769105c

Contents?: true

Size: 1.13 KB

Versions: 1

Compression:

Stored size: 1.13 KB

Contents

package org.embulk.input.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class KafkaJsonDeserializer implements Deserializer<ObjectNode>
{
  private static Logger logger = LoggerFactory.getLogger(KafkaJsonDeserializer.class);
  private static ObjectMapper mapper = new ObjectMapper()
      .registerModules(new Jdk8Module(), new JavaTimeModule());

  @Override
  public ObjectNode deserialize(String topic, byte[] data)
  {
    try {
      JsonNode jsonNode = mapper.readTree(data);
      if (jsonNode.isObject()) {
        return (ObjectNode) jsonNode;
      }
      else {
        logger.warn("Ignore current record that is not an object: {}", data);
        return null;
      }
    }
    catch (IOException e) {
      e.printStackTrace();
      return null;
    }
  }
}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-input-kafka-0.1.0 src/main/java/org/embulk/input/kafka/KafkaJsonDeserializer.java