src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.5 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.6
- old
+ new
@@ -288,12 +288,12 @@
}
}
PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator();
- AtomicInteger counter = new AtomicInteger(0);
- AtomicInteger recordLoggingCount = new AtomicInteger(1);
+ AtomicLong counter = new AtomicLong(0);
+ AtomicLong recordLoggingCount = new AtomicLong(1);
final org.apache.avro.Schema finalAvroSchema = avroSchema;
return new TransactionalPageOutput()
{
private AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema);
@@ -320,10 +320,10 @@
logger.error("produce error", exception);
}
logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value());
- int current = counter.incrementAndGet();
+ long current = counter.incrementAndGet();
if (current >= recordLoggingCount.get()) {
logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current);
recordLoggingCount.set(recordLoggingCount.get() * 2);
}
});