src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.8
- old
+ new
@@ -2,27 +2,30 @@
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.PageReader;
-public abstract class KafkaOutputColumnVisitor implements ColumnVisitor
+public abstract class KafkaOutputColumnVisitor<T> implements ColumnVisitor
{
private KafkaOutputPlugin.PluginTask task;
PageReader pageReader;
private String partitionColumnName;
private Object recordKey = null;
private String topicName = null;
private Integer partition = null;
+ private boolean deletion = false;
KafkaOutputColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader)
{
this.task = task;
this.pageReader = pageReader;
this.partitionColumnName = task.getPartitionColumnName().orElse(null);
}
+ public abstract T getRecord();
+
Object getRecordKey()
{
return recordKey;
}
@@ -48,19 +51,40 @@
Integer getPartition()
{
return partition;
}
+ boolean isDeletion()
+ {
+ return deletion;
+ }
+
void reset()
{
this.recordKey = null;
this.topicName = null;
this.partition = null;
+ this.deletion = false;
}
boolean isIgnoreColumn(Column column)
{
return task.getIgnoreColumns().stream().anyMatch(name -> name.equals(column.getName()));
+ }
+
+ boolean isColumnForDeletion(Column column)
+ {
+ return task.getColumnForDeletion().map(name -> name.equals(column.getName())).orElse(false);
+ }
+
+ @Override
+ public void booleanColumn(Column column)
+ {
+ if (!pageReader.isNull(column)) {
+ if (isColumnForDeletion(column)) {
+ deletion = pageReader.getBoolean(column);
+ }
+ }
}
@Override
public void longColumn(Column column)
{