src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.4 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.5

- old
+ new

@@ -38,10 +38,11 @@ import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class KafkaOutputPlugin implements OutputPlugin { public enum RecordSerializeFormat @@ -197,12 +198,12 @@ { KafkaProducer<Object, ObjectNode> producer = RecordProducerFactory.getForJson(task, schema, task.getOtherProducerConfigs()); PageReader pageReader = new PageReader(schema); PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator(); - AtomicInteger counter = new AtomicInteger(0); - AtomicInteger recordLoggingCount = new AtomicInteger(1); + AtomicLong counter = new AtomicLong(0); + AtomicLong recordLoggingCount = new AtomicLong(1); return new TransactionalPageOutput() { private JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper); @Override @@ -226,10 +227,10 @@ logger.error("produce error", exception); } logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value()); - int current = counter.incrementAndGet(); + long current = counter.incrementAndGet(); if (current >= recordLoggingCount.get()) { logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current); recordLoggingCount.set(recordLoggingCount.get() * 2); } });