src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.4 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.5
- old
+ new
@@ -38,10 +38,11 @@
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class KafkaOutputPlugin
implements OutputPlugin
{
public enum RecordSerializeFormat
@@ -197,12 +198,12 @@
{
KafkaProducer<Object, ObjectNode> producer = RecordProducerFactory.getForJson(task, schema, task.getOtherProducerConfigs());
PageReader pageReader = new PageReader(schema);
PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator();
- AtomicInteger counter = new AtomicInteger(0);
- AtomicInteger recordLoggingCount = new AtomicInteger(1);
+ AtomicLong counter = new AtomicLong(0);
+ AtomicLong recordLoggingCount = new AtomicLong(1);
return new TransactionalPageOutput() {
private JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper);
@Override
@@ -226,10 +227,10 @@
logger.error("produce error", exception);
}
logger.debug("sent record: {key: {}, value: {}}", producerRecord.key(), producerRecord.value());
- int current = counter.incrementAndGet();
+ long current = counter.incrementAndGet();
if (current >= recordLoggingCount.get()) {
logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current);
recordLoggingCount.set(recordLoggingCount.get() * 2);
}
});