src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.8

- old
+ new

@@ -2,27 +2,30 @@ import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; import org.embulk.spi.PageReader; -public abstract class KafkaOutputColumnVisitor implements ColumnVisitor +public abstract class KafkaOutputColumnVisitor<T> implements ColumnVisitor { private KafkaOutputPlugin.PluginTask task; PageReader pageReader; private String partitionColumnName; private Object recordKey = null; private String topicName = null; private Integer partition = null; + private boolean deletion = false; KafkaOutputColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader) { this.task = task; this.pageReader = pageReader; this.partitionColumnName = task.getPartitionColumnName().orElse(null); } + public abstract T getRecord(); + Object getRecordKey() { return recordKey; } @@ -48,19 +51,40 @@ Integer getPartition() { return partition; } + boolean isDeletion() + { + return deletion; + } + void reset() { this.recordKey = null; this.topicName = null; this.partition = null; + this.deletion = false; } boolean isIgnoreColumn(Column column) { return task.getIgnoreColumns().stream().anyMatch(name -> name.equals(column.getName())); + } + + boolean isColumnForDeletion(Column column) + { + return task.getColumnForDeletion().map(name -> name.equals(column.getName())).orElse(false); + } + + @Override + public void booleanColumn(Column column) + { + if (!pageReader.isNull(column)) { + if (isColumnForDeletion(column)) { + deletion = pageReader.getBoolean(column); + } + } } @Override public void longColumn(Column column) {