src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.2 vs src/main/java/org/embulk/output/kafka/RecordProducerFactory.java in embulk-output-kafka-0.1.3

- old
+ new

@@ -93,12 +93,17 @@ static KafkaProducer<Object, Object> getForAvroWithSchemaRegistry(KafkaOutputPlugin.PluginTask task, Schema schema, Map<String, String> configs) { KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(); String schemaRegistryUrl = task.getSchemaRegistryUrl().orElseThrow(() -> new ConfigException("avro_with_schema_registry format needs schema_registry_url")); - Map<String, String> avroSerializerConfigs = ImmutableMap.<String, String>builder() - .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl) - .build(); + ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() + .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + + if (task.getValueSubjectNameStrategy().isPresent()) { + builder.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, task.getValueSubjectNameStrategy().get()); + } + + Map<String, String> avroSerializerConfigs = builder.build(); kafkaAvroSerializer.configure(avroSerializerConfigs, false); return new KafkaProducer<>(buildProperties(task, schema, configs), null, kafkaAvroSerializer); } }