src/test/java/org/embulk/output/kafka/TestKafkaOutputPlugin.java in embulk-output-kafka-0.1.7 vs src/test/java/org/embulk/output/kafka/TestKafkaOutputPlugin.java in embulk-output-kafka-0.1.8
- old
+ new
@@ -1,5 +1,389 @@
package org.embulk.output.kafka;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Resources;
+import com.salesforce.kafka.test.KafkaTestUtils;
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.embulk.config.ConfigSource;
+import org.embulk.spi.OutputPlugin;
+import org.embulk.test.TestingEmbulk;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
public class TestKafkaOutputPlugin
{
+ @ClassRule
+ public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
+ .withBrokers(3);
+
+ @Rule
+ public TestingEmbulk embulk = TestingEmbulk.builder()
+ .registerPlugin(OutputPlugin.class, "kafka", KafkaOutputPlugin.class)
+ .build();
+
+ private KafkaTestUtils kafkaTestUtils;
+ private final static ObjectMapper objectMapper = new ObjectMapper();
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = sharedKafkaTestResource.getKafkaTestUtils();
+ kafkaTestUtils.createTopic("json-topic", 8, (short) 1);
+ kafkaTestUtils.createTopic("json-complex-topic", 8, (short) 1);
+ kafkaTestUtils.createTopic("avro-simple-topic", 8, (short) 1);
+ kafkaTestUtils.createTopic("avro-complex-topic", 8, (short) 1);
+ }
+
+ @After
+ public void tearDown() {
+ kafkaTestUtils.getAdminClient().deleteTopics(ImmutableList.of(
+ "json-topic", "json-complex-topic", "avro-simple-topic", "avro-complex-topic"
+ ));
+ }
+
+ @Test
+ public void testSimpleJson() throws IOException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_simple.yml");
+ configSource.set("brokers", ImmutableList.of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in1.csv").getPath()));
+ List<ConsumerRecord<String, String>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("json-topic", StringDeserializer.class,
+ StringDeserializer.class);
+
+ assertEquals(3, consumerRecords.size());
+ List<JsonNode> deserializedRecords = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : consumerRecords) {
+ deserializedRecords.add(objectMapper.readTree(record.value()));
+ }
+ List<String> ids = deserializedRecords.stream()
+ .map(r -> r.get("id").asText())
+ .collect(Collectors.toList());
+ List<Integer> intItems = deserializedRecords.stream()
+ .map(r -> r.get("int_item").asInt())
+ .collect(Collectors.toList());
+ List<String> varcharItems = deserializedRecords.stream()
+ .map(r -> r.get("varchar_item").asText())
+ .collect(Collectors.toList());
+
+ assertThat(ids, hasItem("A001"));
+ assertThat(ids, hasItem("A002"));
+ assertThat(ids, hasItem("A003"));
+ assertThat(intItems, hasItem(1));
+ assertThat(intItems, hasItem(2));
+ assertThat(intItems, hasItem(3));
+ assertThat(varcharItems, hasItem("a"));
+ assertThat(varcharItems, hasItem("b"));
+ assertThat(varcharItems, hasItem("c"));
+ }
+
+ @Test
+ public void testComplexJson() throws IOException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_complex.yml");
+ configSource.set("brokers", ImmutableList.of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in_complex.csv").getPath()));
+ List<ConsumerRecord<String, String>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("json-complex-topic", StringDeserializer.class,
+ StringDeserializer.class);
+
+ assertEquals(3, consumerRecords.size());
+ List<JsonNode> deserializedRecords = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : consumerRecords) {
+ deserializedRecords.add(objectMapper.readTree(record.value()));
+ }
+ List<String> ids = deserializedRecords.stream()
+ .map(r -> r.get("id").asText())
+ .collect(Collectors.toList());
+ List<Integer> intItems = deserializedRecords.stream()
+ .map(r -> r.get("int_item").asInt())
+ .collect(Collectors.toList());
+ List<List<Integer>> arrayItems = deserializedRecords.stream()
+ .map(r -> ImmutableList.of(
+ r.get("array").get(0).asInt(),
+ r.get("array").get(1).asInt(),
+ r.get("array").get(2).asInt()
+ ))
+ .collect(Collectors.toList());
+
+ assertThat(ids, hasItem("A001"));
+ assertThat(ids, hasItem("A002"));
+ assertThat(ids, hasItem("A003"));
+ assertThat(intItems, hasItem(9));
+ assertThat(intItems, hasItem(0));
+ assertThat(arrayItems.get(0), hasItem(1));
+ assertThat(arrayItems.get(0), hasItem(2));
+ assertThat(arrayItems.get(0), hasItem(3));
+ }
+
+ @Test
+ public void testSimpleAvro() throws IOException {
+ ConfigSource configSource = embulk.loadYamlResource("config_simple_avro.yml");
+ configSource.set("brokers", ImmutableList
+ .of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in1.csv").getPath()));
+
+ SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry
+ .getClientForScope("embulk-output-kafka");
+ KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+
+ List<ConsumerRecord<byte[], byte[]>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("avro-simple-topic");
+
+ assertEquals(3, consumerRecords.size());
+ List<GenericRecord> genericRecords = consumerRecords.stream().map(r -> (GenericRecord) kafkaAvroDeserializer
+ .deserialize("avro-simple-topic", r.value())).collect(Collectors.toList());
+
+ List<String> ids = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("id")))
+ .collect(Collectors.toList());
+ List<Long> intItems = genericRecords.stream()
+ .map(r -> (Long) r.get("int_item"))
+ .collect(Collectors.toList());
+ List<String> varcharItems = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("varchar_item")))
+ .collect(Collectors.toList());
+
+ assertThat(ids, hasItem("A001"));
+ assertThat(ids, hasItem("A002"));
+ assertThat(ids, hasItem("A003"));
+ assertThat(intItems, hasItem(1L));
+ assertThat(intItems, hasItem(2L));
+ assertThat(intItems, hasItem(3L));
+ assertThat(varcharItems, hasItem("a"));
+ assertThat(varcharItems, hasItem("b"));
+ assertThat(varcharItems, hasItem("c"));
+ }
+
+ @Test
+ public void testSimpleAvroSchemaFromRegistry() throws IOException, RestClientException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_simple_avro.yml");
+ Object avsc = configSource.get(Object.class, "avsc");
+ String avscString = objectMapper.writeValueAsString(avsc);
+ configSource.set("avsc", null);
+ ParsedSchema parsedSchema = new AvroSchema(avscString);
+ MockSchemaRegistry.getClientForScope("embulk-output-kafka")
+ .register("avro-simple-topic-value", parsedSchema);
+ configSource.set("brokers", ImmutableList
+ .of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in1.csv").getPath()));
+
+ SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry
+ .getClientForScope("embulk-output-kafka");
+ KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+
+ List<ConsumerRecord<byte[], byte[]>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("avro-simple-topic");
+
+ assertEquals(3, consumerRecords.size());
+ List<GenericRecord> genericRecords = consumerRecords.stream().map(r -> (GenericRecord) kafkaAvroDeserializer
+ .deserialize("avro-simple-topic", r.value())).collect(Collectors.toList());
+
+ List<String> ids = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("id")))
+ .collect(Collectors.toList());
+ List<Long> intItems = genericRecords.stream()
+ .map(r -> (Long) r.get("int_item"))
+ .collect(Collectors.toList());
+ List<String> varcharItems = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("varchar_item")))
+ .collect(Collectors.toList());
+
+ assertThat(ids, hasItem("A001"));
+ assertThat(ids, hasItem("A002"));
+ assertThat(ids, hasItem("A003"));
+ assertThat(intItems, hasItem(1L));
+ assertThat(intItems, hasItem(2L));
+ assertThat(intItems, hasItem(3L));
+ assertThat(varcharItems, hasItem("a"));
+ assertThat(varcharItems, hasItem("b"));
+ assertThat(varcharItems, hasItem("c"));
+ }
+
+ @Test
+ public void testSimpleAvroAvscFile() throws IOException {
+ ConfigSource configSource = embulk.loadYamlResource("config_simple_avro_avsc_file.yml");
+ configSource.set("brokers", ImmutableList
+ .of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in1.csv").getPath()));
+
+ SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry
+ .getClientForScope("embulk-output-kafka");
+ KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+
+ List<ConsumerRecord<byte[], byte[]>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("avro-simple-topic");
+
+ assertEquals(3, consumerRecords.size());
+ List<GenericRecord> genericRecords = consumerRecords.stream().map(r -> (GenericRecord) kafkaAvroDeserializer
+ .deserialize("avro-simple-topic", r.value())).collect(Collectors.toList());
+
+ List<String> ids = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("id")))
+ .collect(Collectors.toList());
+ List<Long> intItems = genericRecords.stream()
+ .map(r -> (Long) r.get("int_item"))
+ .collect(Collectors.toList());
+ List<String> varcharItems = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("varchar_item")))
+ .collect(Collectors.toList());
+
+ assertThat(ids, hasItem("A001"));
+ assertThat(ids, hasItem("A002"));
+ assertThat(ids, hasItem("A003"));
+ assertThat(intItems, hasItem(1L));
+ assertThat(intItems, hasItem(2L));
+ assertThat(intItems, hasItem(3L));
+ assertThat(varcharItems, hasItem("a"));
+ assertThat(varcharItems, hasItem("b"));
+ assertThat(varcharItems, hasItem("c"));
+ }
+
+ @Test
+ public void testSimpleAvroComplex() throws IOException {
+ ConfigSource configSource = embulk.loadYamlResource("config_complex_avro.yml");
+ configSource.set("brokers", ImmutableList
+ .of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in_complex.csv").getPath()));
+
+ SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry
+ .getClientForScope("embulk-output-kafka");
+ KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+
+ List<ConsumerRecord<byte[], byte[]>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("avro-complex-topic");
+
+ assertEquals(3, consumerRecords.size());
+ List<GenericRecord> genericRecords = consumerRecords.stream().map(r -> (GenericRecord) kafkaAvroDeserializer
+ .deserialize("avro-complex-topic", r.value())).collect(Collectors.toList());
+
+ List<String> ids = genericRecords.stream()
+ .map(r -> String.valueOf(r.get("id")))
+ .collect(Collectors.toList());
+ List<Long> intItems = genericRecords.stream()
+ .map(r -> (Long) r.get("int_item"))
+ .collect(Collectors.toList());
+ List<Instant> timeItems = genericRecords.stream()
+ .map(r -> Instant.ofEpochMilli((long) r.get("time")))
+ .collect(Collectors.toList());
+
+ assertThat(ids, hasItem("A001"));
+ assertThat(ids, hasItem("A002"));
+ assertThat(ids, hasItem("A003"));
+ assertThat(intItems, hasItem(9L));
+ assertThat(intItems, hasItem(0L));
+ assertThat(timeItems, hasItem(Instant.parse("2018-02-01T12:15:18.000Z")));
+ assertThat(timeItems, hasItem(Instant.parse("2018-02-02T12:15:18.000Z")));
+ assertThat(timeItems, hasItem(Instant.parse("2018-02-03T12:15:18.000Z")));
+ }
+
+ @Test
+ public void testKeyColumnConfig() throws IOException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_with_key_column.yml");
+ configSource.set("brokers", ImmutableList.of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in1.csv").getPath()));
+ List<ConsumerRecord<String, String>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("json-topic", StringDeserializer.class,
+ StringDeserializer.class);
+
+ assertEquals(3, consumerRecords.size());
+ List<String> keys = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : consumerRecords) {
+ keys.add(record.key());
+ }
+
+ assertThat(keys, hasItem("A001"));
+ assertThat(keys, hasItem("A002"));
+ assertThat(keys, hasItem("A003"));
+ }
+
+ @Test
+ public void testPartitionColumnConfig() throws IOException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_with_partition_column.yml");
+ configSource.set("brokers", ImmutableList.of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in1.csv").getPath()));
+ List<ConsumerRecord<String, String>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("json-topic", StringDeserializer.class,
+ StringDeserializer.class);
+
+ assertEquals(3, consumerRecords.size());
+ List<Integer> partitions = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : consumerRecords) {
+ partitions.add(record.partition());
+ }
+
+ assertThat(partitions, hasItem(1));
+ assertThat(partitions, hasItem(2));
+ assertThat(partitions, hasItem(3));
+ }
+
+ @Test
+ public void testColumnForDeletion() throws IOException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_with_column_for_deletion.yml");
+ configSource.set("brokers", ImmutableList.of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in_with_deletion.csv").getPath()));
+ List<ConsumerRecord<String, String>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("json-topic", StringDeserializer.class,
+ StringDeserializer.class);
+
+ assertEquals(3, consumerRecords.size());
+ HashMap<String, String> recordMap = new HashMap<>();
+ consumerRecords.forEach(record -> recordMap.put(record.key(), record.value()));
+ assertNotNull(recordMap.get("A001"));
+ assertNotNull(recordMap.get("A003"));
+ assertNull(recordMap.get("A002"));
+ }
+
+ @Test
+ public void testColumnForDeletionAvro() throws IOException
+ {
+ ConfigSource configSource = embulk.loadYamlResource("config_with_column_for_deletion_avro.yml");
+ configSource.set("brokers", ImmutableList.of(sharedKafkaTestResource.getKafkaBrokers().getBrokerById(1).getConnectString()));
+ embulk.runOutput(configSource, Paths.get(Resources.getResource("in_with_deletion.csv").getPath()));
+ List<ConsumerRecord<String, String>> consumerRecords = kafkaTestUtils
+ .consumeAllRecordsFromTopic("avro-simple-topic", StringDeserializer.class,
+ StringDeserializer.class);
+
+ assertEquals(3, consumerRecords.size());
+ HashMap<String, String> recordMap = new HashMap<>();
+ consumerRecords.forEach(record -> recordMap.put(record.key(), record.value()));
+ assertNotNull(recordMap.get("A001"));
+ assertNotNull(recordMap.get("A003"));
+ assertNull(recordMap.get("A002"));
+ }
}