src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.1 vs src/main/java/org/embulk/output/kafka/KafkaOutputColumnVisitor.java in embulk-output-kafka-0.1.2

- old
+ new

@@ -30,24 +30,35 @@ if (task.getTopicColumn().isPresent() && task.getTopicColumn().get().equals(column.getName())) { topicName = value; } } + boolean isIgnoreColumn(Column column) + { + return task.getIgnoreColumns().stream().anyMatch(name -> name.equals(column.getName())); + } + @Override public void longColumn(Column column) { - setRecordKey(column, pageReader.getLong(column)); + if (!pageReader.isNull(column)) { + setRecordKey(column, pageReader.getLong(column)); + } } @Override public void doubleColumn(Column column) { - setRecordKey(column, pageReader.getDouble(column)); + if (!pageReader.isNull(column)) { + setRecordKey(column, pageReader.getDouble(column)); + } } @Override public void stringColumn(Column column) { - setRecordKey(column, pageReader.getString(column)); - setTopicName(column, pageReader.getString(column)); + if (!pageReader.isNull(column)) { + setRecordKey(column, pageReader.getString(column)); + setTopicName(column, pageReader.getString(column)); + } } }