src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.1 vs src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.2

- old
+ new

@@ -25,10 +25,14 @@ } @Override public void booleanColumn(Column column) { + if (isIgnoreColumn(column)) { + return; + } + if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } @@ -36,10 +40,14 @@ } @Override public void longColumn(Column column) { + if (isIgnoreColumn(column)) { + return; + } + if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } @@ -48,10 +56,14 @@ } @Override public void doubleColumn(Column column) { + if (isIgnoreColumn(column)) { + return; + } + if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } @@ -60,10 +72,14 @@ } @Override public void stringColumn(Column column) { + if (isIgnoreColumn(column)) { + return; + } + if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } @@ -72,10 +88,14 @@ } @Override public void timestampColumn(Column column) { + if (isIgnoreColumn(column)) { + return; + } + if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } @@ -83,9 +103,13 @@ } @Override public void jsonColumn(Column column) { + if (isIgnoreColumn(column)) { + return; + } + if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; }