src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.6 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.7

- old
+ new

@@ -37,11 +37,10 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class KafkaOutputPlugin implements OutputPlugin { @@ -219,17 +218,21 @@ if (recordKey == null) { recordKey = randomLong.next(); } String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic(); - ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.getJsonNode()); + ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getJsonNode()); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { logger.error("produce error", exception); } - logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value()); + logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}", + producerRecord.topic(), + producerRecord.key(), + producerRecord.value(), + producerRecord.partition()); long current = counter.incrementAndGet(); if (current >= recordLoggingCount.get()) { logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current); recordLoggingCount.set(recordLoggingCount.get() * 2); @@ -318,10 +321,14 @@ producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { logger.error("produce error", exception); } - logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value()); + logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}", + producerRecord.topic(), + producerRecord.key(), + producerRecord.value(), + producerRecord.partition()); long current = counter.incrementAndGet(); if (current >= recordLoggingCount.get()) { logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current); recordLoggingCount.set(recordLoggingCount.get() * 2);