src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.6 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.7
- old
+ new
@@ -37,11 +37,10 @@
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class KafkaOutputPlugin
implements OutputPlugin
{
@@ -219,17 +218,21 @@
if (recordKey == null) {
recordKey = randomLong.next();
}
String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic();
- ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.getJsonNode());
+ ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getJsonNode());
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("produce error", exception);
}
- logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value());
+ logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}",
+ producerRecord.topic(),
+ producerRecord.key(),
+ producerRecord.value(),
+ producerRecord.partition());
long current = counter.incrementAndGet();
if (current >= recordLoggingCount.get()) {
logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current);
recordLoggingCount.set(recordLoggingCount.get() * 2);
@@ -318,10 +321,14 @@
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("produce error", exception);
}
- logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value());
+ logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}",
+ producerRecord.topic(),
+ producerRecord.key(),
+ producerRecord.value(),
+ producerRecord.partition());
long current = counter.incrementAndGet();
if (current >= recordLoggingCount.get()) {
logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current);
recordLoggingCount.set(recordLoggingCount.get() * 2);