src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.3 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.4

- old
+ new

@@ -2,22 +2,24 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.avro.generic.GenericData; +import com.google.common.collect.ImmutableList; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; -import org.embulk.spi.ColumnConfig; import org.embulk.spi.Exec; import org.embulk.spi.OutputPlugin; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; @@ -30,11 +32,15 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; +import java.util.Properties; import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; public class KafkaOutputPlugin implements OutputPlugin { @@ -94,10 +100,14 @@ @Config("key_column_name") @ConfigDefault("null") public Optional<String> getKeyColumnName(); + @Config("partition_column_name") + @ConfigDefault("null") + public Optional<String> getPartitionColumnName(); + @Config("record_batch_size") @ConfigDefault("1000") public int getRecordBatchSize(); @Config("acks") @@ -122,21 +132,35 @@ } private static ObjectMapper objectMapper = new ObjectMapper(); private Logger logger = LoggerFactory.getLogger(getClass()); + private AdminClient getKafkaAdminClient(PluginTask task) + { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, task.getBrokers()); + AdminClient adminClient = AdminClient.create(properties); + return adminClient; + } + @Override public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, Control control) { PluginTask task = config.loadConfig(PluginTask.class); + AdminClient adminClient = getKafkaAdminClient(task); + DescribeTopicsResult result = adminClient.describeTopics(ImmutableList.of(task.getTopic())); + try { + if (result.all().get(30, TimeUnit.SECONDS).size() == 0) { + throw new RuntimeException("target topic is not found"); + } + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("failed to connect kafka brokers"); + } - // retryable (idempotent) output: - // return resume(task.dump(), schema, taskCount, control); - - // non-retryable (non-idempotent) output: control.run(task.dump()); return Exec.newConfigDiff(); } @Override @@ -177,26 +201,28 @@ PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator(); AtomicInteger counter = new AtomicInteger(0); AtomicInteger recordLoggingCount = new AtomicInteger(1); return new TransactionalPageOutput() { + private JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper); + @Override public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { - JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper); + columnVisitor.reset(); pageReader.getSchema().visitColumns(columnVisitor); - Object recordKey = columnVisitor.recordKey; + Object recordKey = columnVisitor.getRecordKey(); if (recordKey == null) { recordKey = randomLong.next(); } - String targetTopic = columnVisitor.topicName != null ? columnVisitor.topicName : task.getTopic(); - ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.jsonNode); + String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic(); + ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.getJsonNode()); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { logger.error("produce error", exception); } @@ -267,26 +293,28 @@ AtomicInteger recordLoggingCount = new AtomicInteger(1); final org.apache.avro.Schema finalAvroSchema = avroSchema; return new TransactionalPageOutput() { + private AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema); + @Override public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { - AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema, new GenericData.Record(finalAvroSchema)); + columnVisitor.reset(); pageReader.getSchema().visitColumns(columnVisitor); - Object recordKey = columnVisitor.recordKey; + Object recordKey = columnVisitor.getRecordKey(); if (recordKey == null) { recordKey = randomLong.next(); } - String targetTopic = columnVisitor.topicName != null ? columnVisitor.topicName : task.getTopic(); + String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic(); - ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.genericRecord); + ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getGenericRecord()); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { logger.error("produce error", exception); }