src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.8
- old
+ new
@@ -1,10 +1,10 @@
package org.embulk.output.kafka;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -94,13 +94,13 @@
{
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
String schemaRegistryUrl = task.getSchemaRegistryUrl().orElseThrow(() -> new ConfigException("avro_with_schema_registry format needs schema_registry_url"));
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
- .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+ .put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (task.getValueSubjectNameStrategy().isPresent()) {
- builder.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, task.getValueSubjectNameStrategy().get());
+ builder.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, task.getValueSubjectNameStrategy().get());
}
Map<String, String> avroSerializerConfigs = builder.build();
kafkaAvroSerializer.configure(avroSerializerConfigs, false);