src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.3 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.4
- old
+ new
@@ -2,22 +2,24 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.avro.generic.GenericData;
+import com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
-import org.embulk.spi.ColumnConfig;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
@@ -30,11 +32,15 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.PrimitiveIterator;
+import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public class KafkaOutputPlugin
implements OutputPlugin
{
@@ -94,10 +100,14 @@
@Config("key_column_name")
@ConfigDefault("null")
public Optional<String> getKeyColumnName();
+ @Config("partition_column_name")
+ @ConfigDefault("null")
+ public Optional<String> getPartitionColumnName();
+
@Config("record_batch_size")
@ConfigDefault("1000")
public int getRecordBatchSize();
@Config("acks")
@@ -122,21 +132,35 @@
}
private static ObjectMapper objectMapper = new ObjectMapper();
private Logger logger = LoggerFactory.getLogger(getClass());
+ private AdminClient getKafkaAdminClient(PluginTask task)
+ {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, task.getBrokers());
+ AdminClient adminClient = AdminClient.create(properties);
+ return adminClient;
+ }
+
@Override
public ConfigDiff transaction(ConfigSource config,
Schema schema, int taskCount,
Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
+ AdminClient adminClient = getKafkaAdminClient(task);
+ DescribeTopicsResult result = adminClient.describeTopics(ImmutableList.of(task.getTopic()));
+ try {
+ if (result.all().get(30, TimeUnit.SECONDS).size() == 0) {
+ throw new RuntimeException("target topic is not found");
+ }
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException("failed to connect kafka brokers");
+ }
- // retryable (idempotent) output:
- // return resume(task.dump(), schema, taskCount, control);
-
- // non-retryable (non-idempotent) output:
control.run(task.dump());
return Exec.newConfigDiff();
}
@Override
@@ -177,26 +201,28 @@
PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator();
AtomicInteger counter = new AtomicInteger(0);
AtomicInteger recordLoggingCount = new AtomicInteger(1);
return new TransactionalPageOutput() {
+ private JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper);
+
@Override
public void add(Page page)
{
pageReader.setPage(page);
while (pageReader.nextRecord()) {
- JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper);
+ columnVisitor.reset();
pageReader.getSchema().visitColumns(columnVisitor);
- Object recordKey = columnVisitor.recordKey;
+ Object recordKey = columnVisitor.getRecordKey();
if (recordKey == null) {
recordKey = randomLong.next();
}
- String targetTopic = columnVisitor.topicName != null ? columnVisitor.topicName : task.getTopic();
- ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.jsonNode);
+ String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic();
+ ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.getJsonNode());
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("produce error", exception);
}
@@ -267,26 +293,28 @@
AtomicInteger recordLoggingCount = new AtomicInteger(1);
final org.apache.avro.Schema finalAvroSchema = avroSchema;
return new TransactionalPageOutput()
{
+ private AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema);
+
@Override
public void add(Page page)
{
pageReader.setPage(page);
while (pageReader.nextRecord()) {
- AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema, new GenericData.Record(finalAvroSchema));
+ columnVisitor.reset();
pageReader.getSchema().visitColumns(columnVisitor);
- Object recordKey = columnVisitor.recordKey;
+ Object recordKey = columnVisitor.getRecordKey();
if (recordKey == null) {
recordKey = randomLong.next();
}
- String targetTopic = columnVisitor.topicName != null ? columnVisitor.topicName : task.getTopic();
+ String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic();
- ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(targetTopic, recordKey, columnVisitor.genericRecord);
+ ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getGenericRecord());
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("produce error", exception);
}