src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.3 vs src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.4

- old
+ new

@@ -11,18 +11,29 @@ import java.time.format.DateTimeFormatter; public class JsonFormatColumnVisitor extends KafkaOutputColumnVisitor { private ObjectMapper objectMapper; - public ObjectNode jsonNode; + private ObjectNode jsonNode; private static DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_INSTANT; - public JsonFormatColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader, ObjectMapper objectMapper) + JsonFormatColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader, ObjectMapper objectMapper) { super(task, pageReader); this.objectMapper = objectMapper; + } + + ObjectNode getJsonNode() + { + return jsonNode; + } + + @Override + void reset() + { + super.reset(); this.jsonNode = objectMapper.createObjectNode(); } @Override public void booleanColumn(Column column) @@ -40,10 +51,12 @@ } @Override public void longColumn(Column column) { + super.longColumn(column); + if (isIgnoreColumn(column)) { return; } if (pageReader.isNull(column)) { @@ -56,36 +69,38 @@ } @Override public void doubleColumn(Column column) { + super.doubleColumn(column); + if (isIgnoreColumn(column)) { return; } if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } jsonNode.put(column.getName(), pageReader.getDouble(column)); - super.doubleColumn(column); } @Override public void stringColumn(Column column) { + super.stringColumn(column); + if (isIgnoreColumn(column)) { return; } if (pageReader.isNull(column)) { jsonNode.putNull(column.getName()); return; } jsonNode.put(column.getName(), pageReader.getString(column)); - super.stringColumn(column); } @Override public void timestampColumn(Column column) {