src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/KafkaOutputPlugin.java in embulk-output-kafka-0.1.8

- old
+ new

@@ -3,45 +3,44 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; +import io.confluent.kafka.serializers.subject.TopicNameStrategy; +import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.OutputPlugin; -import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.PrimitiveIterator; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; public class KafkaOutputPlugin implements OutputPlugin { public enum RecordSerializeFormat @@ -105,11 +104,11 @@ @Config("partition_column_name") @ConfigDefault("null") public Optional<String> getPartitionColumnName(); @Config("record_batch_size") - @ConfigDefault("1000") + @ConfigDefault("16384") public int getRecordBatchSize(); @Config("acks") @ConfigDefault("\"1\"") public String getAcks(); @@ -127,21 +126,25 @@ public List<String> getIgnoreColumns(); @Config("value_subject_name_strategy") @ConfigDefault("null") public Optional<String> getValueSubjectNameStrategy(); + + @Config("column_for_deletion") + @ConfigDefault("null") + public Optional<String> getColumnForDeletion(); } private static ObjectMapper objectMapper = new ObjectMapper(); - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final int SCHEMA_REGISTRY_IDENTITY_MAP_CAPACITY = 1000; + private AdminClient getKafkaAdminClient(PluginTask task) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, task.getBrokers()); - AdminClient adminClient = AdminClient.create(properties); - return adminClient; + return AdminClient.create(properties); } @Override public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, @@ -187,181 +190,79 @@ case JSON: return buildPageOutputForJson(task, schema, taskIndex); case AVRO_WITH_SCHEMA_REGISTRY: return buildPageOutputForAvroWithSchemaRegistry(task, schema, taskIndex); default: - throw new ConfigException("Unknow serialize format"); + throw new ConfigException("Unknown serialize format"); } } private TransactionalPageOutput buildPageOutputForJson(PluginTask task, Schema schema, int taskIndex) { KafkaProducer<Object, ObjectNode> producer = RecordProducerFactory.getForJson(task, schema, task.getOtherProducerConfigs()); - PageReader pageReader = new PageReader(schema); - PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator(); - AtomicLong counter = new AtomicLong(0); - AtomicLong recordLoggingCount = new AtomicLong(1); + KafkaOutputColumnVisitor<ObjectNode> columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper); - return new TransactionalPageOutput() { - private JsonFormatColumnVisitor columnVisitor = new JsonFormatColumnVisitor(task, pageReader, objectMapper); - - @Override - public void add(Page page) - { - pageReader.setPage(page); - while (pageReader.nextRecord()) { - columnVisitor.reset(); - - pageReader.getSchema().visitColumns(columnVisitor); - - Object recordKey = columnVisitor.getRecordKey(); - if (recordKey == null) { - recordKey = randomLong.next(); - } - - String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic(); - ProducerRecord<Object, ObjectNode> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getJsonNode()); - producer.send(producerRecord, (metadata, exception) -> { - if (exception != null) { - logger.error("produce error", exception); - } - - logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}", - producerRecord.topic(), - producerRecord.key(), - producerRecord.value(), - producerRecord.partition()); - - long current = counter.incrementAndGet(); - if (current >= recordLoggingCount.get()) { - logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current); - recordLoggingCount.set(recordLoggingCount.get() * 2); - } - }); - } - } - - @Override - public void finish() - { - producer.flush(); - } - - @Override - public void close() - { - producer.close(); - } - - @Override - public void abort() - { - producer.flush(); - producer.close(); - } - - @Override - public TaskReport commit() - { - return null; - } - }; + return new JsonFormatTransactionalPageOutput(producer, pageReader, columnVisitor, task.getTopic(), taskIndex); } private TransactionalPageOutput buildPageOutputForAvroWithSchemaRegistry(PluginTask task, Schema schema, int taskIndex) { KafkaProducer<Object, Object> producer = RecordProducerFactory.getForAvroWithSchemaRegistry(task, schema, task.getOtherProducerConfigs()); - PageReader pageReader = new PageReader(schema); + org.apache.avro.Schema avroSchema = getAvroSchema(task); + AvroFormatColumnVisitor avroFormatColumnVisitor = new AvroFormatColumnVisitor(task, pageReader, avroSchema); + return new AvroFormatTransactionalPageOutput(producer, pageReader, avroFormatColumnVisitor, task.getTopic(), taskIndex); + } + + private org.apache.avro.Schema getAvroSchema(PluginTask task) + { org.apache.avro.Schema avroSchema = null; - if (!task.getAvsc().isPresent() && !task.getAvscFile().isPresent() || task.getAvsc().isPresent() == task.getAvscFile().isPresent()) { + if (!task.getSchemaRegistryUrl().isPresent()) { + throw new ConfigException("avro_with_schema_registry format needs schema_registry_url"); + } + + if (task.getAvsc().isPresent() && task.getAvscFile().isPresent()) { throw new ConfigException("avro_with_schema_registry format needs either one of avsc and avsc_file"); } + if (task.getAvsc().isPresent()) { avroSchema = new org.apache.avro.Schema.Parser().parse(task.getAvsc().get().toString()); + return avroSchema; } if (task.getAvscFile().isPresent()) { try { avroSchema = new org.apache.avro.Schema.Parser().parse(task.getAvscFile().get()); + return avroSchema; } catch (IOException e) { e.printStackTrace(); throw new ConfigException("avsc_file cannot read"); } } - PrimitiveIterator.OfLong randomLong = new Random().longs(1, Long.MAX_VALUE).iterator(); + SchemaRegistryClient schemaRegistryClient = getSchemaRegistryClient(task.getSchemaRegistryUrl().get()); + SubjectNameStrategy subjectNameStrategy = new TopicNameStrategy(); + String subjectName = subjectNameStrategy.subjectName(task.getTopic(), false, null); + try { + String schema = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema(); + avroSchema = new org.apache.avro.Schema.Parser().parse(schema); + return avroSchema; + } + catch (IOException | RestClientException e) { + throw new ConfigException("cannot fetch latest schema from schema registry.", e); + } + } - AtomicLong counter = new AtomicLong(0); - AtomicLong recordLoggingCount = new AtomicLong(1); - - final org.apache.avro.Schema finalAvroSchema = avroSchema; - return new TransactionalPageOutput() - { - private AvroFormatColumnVisitor columnVisitor = new AvroFormatColumnVisitor(task, pageReader, finalAvroSchema); - - @Override - public void add(Page page) - { - pageReader.setPage(page); - while (pageReader.nextRecord()) { - columnVisitor.reset(); - - pageReader.getSchema().visitColumns(columnVisitor); - - Object recordKey = columnVisitor.getRecordKey(); - if (recordKey == null) { - recordKey = randomLong.next(); - } - - String targetTopic = columnVisitor.getTopicName() != null ? columnVisitor.getTopicName() : task.getTopic(); - - ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(targetTopic, columnVisitor.getPartition(), recordKey, columnVisitor.getGenericRecord()); - producer.send(producerRecord, (metadata, exception) -> { - if (exception != null) { - logger.error("produce error", exception); - } - - logger.debug("sent record: {topic: {}, key: {}, value: {}, partition: {}}", - producerRecord.topic(), - producerRecord.key(), - producerRecord.value(), - producerRecord.partition()); - - long current = counter.incrementAndGet(); - if (current >= recordLoggingCount.get()) { - logger.info("[task-{}] Producer sent {} records", String.format("%04d", taskIndex), current); - recordLoggingCount.set(recordLoggingCount.get() * 2); - } - }); - } - } - - @Override - public void finish() - { - producer.flush(); - } - - @Override - public void close() - { - producer.close(); - } - - @Override - public void abort() - { - producer.flush(); - producer.close(); - } - - @Override - public TaskReport commit() - { - return null; - } - }; + private static final String MOCK_SCHEMA_REGISTRY_PREFIX = "mock://"; + private SchemaRegistryClient getSchemaRegistryClient(String url) + { + if (url.startsWith(MOCK_SCHEMA_REGISTRY_PREFIX)) { + String mockScope = url.substring(MOCK_SCHEMA_REGISTRY_PREFIX.length()); + return MockSchemaRegistry.getClientForScope(mockScope); + } + else { + return new CachedSchemaRegistryClient(url, SCHEMA_REGISTRY_IDENTITY_MAP_CAPACITY); + } } }