src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.8

- old
+ new

@@ -1,10 +1,10 @@ package org.embulk.output.kafka; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.DoubleSerializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -94,13 +94,13 @@ { KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(); String schemaRegistryUrl = task.getSchemaRegistryUrl().orElseThrow(() -> new ConfigException("avro_with_schema_registry format needs schema_registry_url")); ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() - .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + .put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); if (task.getValueSubjectNameStrategy().isPresent()) { - builder.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, task.getValueSubjectNameStrategy().get()); + builder.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, task.getValueSubjectNameStrategy().get()); } Map<String, String> avroSerializerConfigs = builder.build(); kafkaAvroSerializer.configure(avroSerializerConfigs, false);