src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.1 vs src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.2
- old
+ new
@@ -30,24 +30,35 @@
if (task.getTopicColumn().isPresent() && task.getTopicColumn().get().equals(column.getName())) {
topicName = value;
}
}
+ boolean isIgnoreColumn(Column column)
+ {
+ return task.getIgnoreColumns().stream().anyMatch(name -> name.equals(column.getName()));
+ }
+
@Override
public void longColumn(Column column)
{
- setRecordKey(column, pageReader.getLong(column));
+ if (!pageReader.isNull(column)) {
+ setRecordKey(column, pageReader.getLong(column));
+ }
}
@Override
public void doubleColumn(Column column)
{
- setRecordKey(column, pageReader.getDouble(column));
+ if (!pageReader.isNull(column)) {
+ setRecordKey(column, pageReader.getDouble(column));
+ }
}
@Override
public void stringColumn(Column column)
{
- setRecordKey(column, pageReader.getString(column));
- setTopicName(column, pageReader.getString(column));
+ if (!pageReader.isNull(column)) {
+ setRecordKey(column, pageReader.getString(column));
+ setTopicName(column, pageReader.getString(column));
+ }
}
}