src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.8
- old
+ new
@@ -3,45 +3,44 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
+import io.confluent.kafka.serializers.subject.TopicNameStrategy;
+import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
-import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
-import java.util.PrimitiveIterator;
import java.util.Properties;
-import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
public class KafkaOutputPlugin
implements OutputPlugin
{
public enum RecordSerializeFormat
@@ -105,11 +104,11 @@
@Config("partition_column_name")
@ConfigDefault("null")
public Optional<String> getPartitionColumnName();
@Config("record_batch_size")
- @ConfigDefault("1000")
+ @ConfigDefault("16384")
public int getRecordBatchSize();
@Config("acks")
@ConfigDefault("\"1\"")
public String getAcks();
@@ -127,21 +126,25 @@
public List<String> getIgnoreColumns();
@Config("value_subject_name_strategy")
@ConfigDefault("null")
public Optional<String> getValueSubjectNameStrategy();
+
+ @Config("column_for_deletion")
+ @ConfigDefault("null")
+ public Optional<String> getColumnForDeletion();
}
private static ObjectMapper objectMapper = new ObjectMapper();
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final int SCHEMA_REGISTRY_IDENTITY_MAP_CAPACITY = 1000;
+
private AdminClient getKafkaAdminClient(PluginTask task)
{
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, task.getBrokers());
- AdminClient adminClient = AdminClient.create(properties);
- return adminClient;
+ return AdminClient.create(properties);
}
@Override
public ConfigDiff transaction(ConfigSource config,
Schema schema, int taskCount,
@@ -187,181 +190,79 @@
case JSON:
return buildPageOutputForJson(task, schema, taskIndex);
case AVRO_WITH_SCHEMA_REGISTRY:
return buildPageOutputForAvroWithSchemaRegistry(task, schema, taskIndex);
default:
- throw new ConfigException("Unknow serialize format");
+ throw new ConfigException("Unknown serialize format");
}
}
private TransactionalPageOutput buildPageOutputForJson(PluginTask task, Schema schema, int taskIndex)
{
KafkaProducer<Object, ObjectNode> producer = RecordProducerFactory.getForJson(task, schema, task.getOtherProducerConfigs());
-
PageReader pageReader = new PageReader(schema);
- PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator();
- AtomicLong counter = new AtomicLong(0);
- AtomicLong recordLoggingCount = new AtomicLong(1);
+ KafkaOutputColumnVisitor<ObjectNode> columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper);
- return new TransactionalPageOutput() {
- private JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper);
-
- @Override
- public void add(Page page)
- {
- pageReader.setPage(page);
- while (pageReader.nextRecord()) {
- columnVisitor.reset();
-
- pageReader.getSchema().visitColumns(columnVisitor);
-
- Object recordKey = columnVisitor.getRecordKey();
- if (recordKey == null) {
- recordKey = randomLong.next();
- }
-
- String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic();
- ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getJsonNode());
- producer.send(producerRecord, (metadata, exception) -> {
- if (exception != null) {
- logger.error("produce error", exception);
- }
-
- logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}",
- producerRecord.topic(),
- producerRecord.key(),
- producerRecord.value(),
- producerRecord.partition());
-
- long current = counter.incrementAndGet();
- if (current >= recordLoggingCount.get()) {
- logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current);
- recordLoggingCount.set(recordLoggingCount.get() * 2);
- }
- });
- }
- }
-
- @Override
- public void finish()
- {
- producer.flush();
- }
-
- @Override
- public void close()
- {
- producer.close();
- }
-
- @Override
- public void abort()
- {
- producer.flush();
- producer.close();
- }
-
- @Override
- public TaskReport commit()
- {
- return null;
- }
- };
+ return new JsonFormatTransactionalPageOutput(producer, pageReader, columnVisitor, task.getTopic(), taskIndex);
}
private TransactionalPageOutput buildPageOutputForAvroWithSchemaRegistry(PluginTask task, Schema schema, int taskIndex)
{
KafkaProducer<Object, Object> producer = RecordProducerFactory.getForAvroWithSchemaRegistry(task, schema, task.getOtherProducerConfigs());
-
PageReader pageReader = new PageReader(schema);
+ org.apache.avro.Schema avroSchema = getAvroSchema(task);
+ AvroFormatColumnVisitor avroFormatColumnVisitor = new AvroFormatColumnVisitor(task, pageReader, avroSchema);
+ return new AvroFormatTransactionalPageOutput(producer, pageReader, avroFormatColumnVisitor, task.getTopic(), taskIndex);
+ }
+
+ private org.apache.avro.Schema getAvroSchema(PluginTask task)
+ {
org.apache.avro.Schema avroSchema = null;
- if (!task.getAvsc().isPresent() && !task.getAvscFile().isPresent() || task.getAvsc().isPresent() == task.getAvscFile().isPresent()) {
+ if (!task.getSchemaRegistryUrl().isPresent()) {
+ throw new ConfigException("avro_with_schema_registry format needs schema_registry_url");
+ }
+
+ if (task.getAvsc().isPresent() && task.getAvscFile().isPresent()) {
throw new ConfigException("avro_with_schema_registry format needs either one of avsc and avsc_file");
}
+
if (task.getAvsc().isPresent()) {
avroSchema = new org.apache.avro.Schema.Parser().parse(task.getAvsc().get().toString());
+ return avroSchema;
}
if (task.getAvscFile().isPresent()) {
try {
avroSchema = new org.apache.avro.Schema.Parser().parse(task.getAvscFile().get());
+ return avroSchema;
}
catch (IOException e) {
e.printStackTrace();
throw new ConfigException("avsc_file cannot read");
}
}
- PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator();
+ SchemaRegistryClient schemaRegistryClient = getSchemaRegistryClient(task.getSchemaRegistryUrl().get());
+ SubjectNameStrategy subjectNameStrategy = new TopicNameStrategy();
+ String subjectName = subjectNameStrategy.subjectName(task.getTopic(), false, null);
+ try {
+ String schema = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema();
+ avroSchema = new org.apache.avro.Schema.Parser().parse(schema);
+ return avroSchema;
+ }
+ catch (IOException | RestClientException e) {
+ throw new ConfigException("cannot fetch latest schema from schema registry.", e);
+ }
+ }
- AtomicLong counter = new AtomicLong(0);
- AtomicLong recordLoggingCount = new AtomicLong(1);
-
- final org.apache.avro.Schema finalAvroSchema = avroSchema;
- return new TransactionalPageOutput()
- {
- private AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema);
-
- @Override
- public void add(Page page)
- {
- pageReader.setPage(page);
- while (pageReader.nextRecord()) {
- columnVisitor.reset();
-
- pageReader.getSchema().visitColumns(columnVisitor);
-
- Object recordKey = columnVisitor.getRecordKey();
- if (recordKey == null) {
- recordKey = randomLong.next();
- }
-
- String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic();
-
- ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getGenericRecord());
- producer.send(producerRecord, (metadata, exception) -> {
- if (exception != null) {
- logger.error("produce error", exception);
- }
-
- logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}",
- producerRecord.topic(),
- producerRecord.key(),
- producerRecord.value(),
- producerRecord.partition());
-
- long current = counter.incrementAndGet();
- if (current >= recordLoggingCount.get()) {
- logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current);
- recordLoggingCount.set(recordLoggingCount.get() * 2);
- }
- });
- }
- }
-
- @Override
- public void finish()
- {
- producer.flush();
- }
-
- @Override
- public void close()
- {
- producer.close();
- }
-
- @Override
- public void abort()
- {
- producer.flush();
- producer.close();
- }
-
- @Override
- public TaskReport commit()
- {
- return null;
- }
- };
+ private static final String MOCK_SCHEMA_REGISTRY_PREFIX = "mock://";
+ private SchemaRegistryClient getSchemaRegistryClient(String url)
+ {
+ if (url.startsWith(MOCK_SCHEMA_REGISTRY_PREFIX)) {
+ String mockScope = url.substring(MOCK_SCHEMA_REGISTRY_PREFIX.length());
+ return MockSchemaRegistry.getClientForScope(mockScope);
+ }
+ else {
+ return new CachedSchemaRegistryClient(url, SCHEMA_REGISTRY_IDENTITY_MAP_CAPACITY);
+ }
}
}