src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.2 vs src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.3
- old
+ new
@@ -93,12 +93,17 @@
static KafkaProducer<Object, Object> getForAvroWithSchemaRegistry(KafkaOutputPlugin.PluginTask task, Schema schema, Map<String, String> configs)
{
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
String schemaRegistryUrl = task.getSchemaRegistryUrl().orElseThrow(() -> new ConfigException("avro_with_schema_registry format needs schema_registry_url"));
- Map<String, String> avroSerializerConfigs = ImmutableMap.<String, String>builder()
- .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
- .build();
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
+ .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+
+ if (task.getValueSubjectNameStrategy().isPresent()) {
+ builder.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, task.getValueSubjectNameStrategy().get());
+ }
+
+ Map<String, String> avroSerializerConfigs = builder.build();
kafkaAvroSerializer.configure(avroSerializerConfigs, false);
return new KafkaProducer<>(buildProperties(task, schema, configs), null, kafkaAvroSerializer);
}
}