src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.3 vs src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.4
- old
+ new
@@ -4,45 +4,75 @@
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.PageReader;
public abstract class KafkaOutputColumnVisitor implements ColumnVisitor
{
- KafkaOutputPlugin.PluginTask task;
+ private KafkaOutputPlugin.PluginTask task;
PageReader pageReader;
+ private String partitionColumnName;
- public Object recordKey = null;
- public String topicName = null;
+ private Object recordKey = null;
+ private String topicName = null;
+ private Integer partition = null;
- public KafkaOutputColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader)
+ KafkaOutputColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader)
{
this.task = task;
this.pageReader = pageReader;
+ this.partitionColumnName = task.getPartitionColumnName().orElse(null);
}
- void setRecordKey(Column column, Object value)
+ Object getRecordKey()
{
+ return recordKey;
+ }
+
+ private void setRecordKey(Column column, Object value)
+ {
if (task.getKeyColumnName().isPresent() && task.getKeyColumnName().get().equals(column.getName())) {
recordKey = value;
}
}
- void setTopicName(Column column, String value)
+ String getTopicName()
{
+ return topicName;
+ }
+
+ private void setTopicName(Column column, String value)
+ {
if (task.getTopicColumn().isPresent() && task.getTopicColumn().get().equals(column.getName())) {
topicName = value;
}
}
+ Integer getPartition()
+ {
+ return partition;
+ }
+
+ void reset()
+ {
+ this.recordKey = null;
+ this.topicName = null;
+ this.partition = null;
+ }
+
boolean isIgnoreColumn(Column column)
{
return task.getIgnoreColumns().stream().anyMatch(name -> name.equals(column.getName()));
}
@Override
public void longColumn(Column column)
{
if (!pageReader.isNull(column)) {
- setRecordKey(column, pageReader.getLong(column));
+ long value = pageReader.getLong(column);
+ setRecordKey(column, value);
+
+ if (partitionColumnName != null && partitionColumnName.equals(column.getName())) {
+ partition = Long.valueOf(value).intValue();
+ }
}
}
@Override
public void doubleColumn(Column column)