src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.3 vs src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.4

- old
+ new

@@ -4,45 +4,75 @@ import org.embulk.spi.ColumnVisitor; import org.embulk.spi.PageReader; public abstract class KafkaOutputColumnVisitor implements ColumnVisitor { - KafkaOutputPlugin.PluginTask task; + private KafkaOutputPlugin.PluginTask task; PageReader pageReader; + private String partitionColumnName; - public Object recordKey = null; - public String topicName = null; + private Object recordKey = null; + private String topicName = null; + private Integer partition = null; - public KafkaOutputColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader) + KafkaOutputColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader) { this.task = task; this.pageReader = pageReader; + this.partitionColumnName = task.getPartitionColumnName().orElse(null); } - void setRecordKey(Column column, Object value) + Object getRecordKey() { + return recordKey; + } + + private void setRecordKey(Column column, Object value) + { if (task.getKeyColumnName().isPresent() && task.getKeyColumnName().get().equals(column.getName())) { recordKey = value; } } - void setTopicName(Column column, String value) + String getTopicName() { + return topicName; + } + + private void setTopicName(Column column, String value) + { if (task.getTopicColumn().isPresent() && task.getTopicColumn().get().equals(column.getName())) { topicName = value; } } + Integer getPartition() + { + return partition; + } + + void reset() + { + this.recordKey = null; + this.topicName = null; + this.partition = null; + } + boolean isIgnoreColumn(Column column) { return task.getIgnoreColumns().stream().anyMatch(name -> name.equals(column.getName())); } @Override public void longColumn(Column column) { if (!pageReader.isNull(column)) { - setRecordKey(column, pageReader.getLong(column)); + long value = pageReader.getLong(column); + setRecordKey(column, value); + + if (partitionColumnName != null && partitionColumnName.equals(column.getName())) { + partition = Long.valueOf(value).intValue(); + } } } @Override public void doubleColumn(Column column)