src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.5 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.6

- old
+ new

@@ -288,12 +288,12 @@ } } PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator(); - AtomicInteger counter = new AtomicInteger(0); - AtomicInteger recordLoggingCount = new AtomicInteger(1); + AtomicLong counter = new AtomicLong(0); + AtomicLong recordLoggingCount = new AtomicLong(1); final org.apache.avro.Schema finalAvroSchema = avroSchema; return new TransactionalPageOutput() { private AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema); @@ -320,10 +320,10 @@ logger.error("produce error", exception); } logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value()); - int current = counter.incrementAndGet(); + long current = counter.incrementAndGet(); if (current >= recordLoggingCount.get()) { logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current); recordLoggingCount.set(recordLoggingCount.get() * 2); } });