src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.8

- old
+ new

@@ -8,11 +8,11 @@ import org.msgpack.value.Value; import java.io.IOException; import java.time.format.DateTimeFormatter; -public class JsonFormatColumnVisitor extends KafkaOutputColumnVisitor +public class JsonFormatColumnVisitor extends KafkaOutputColumnVisitor<ObjectNode> { private ObjectMapper objectMapper; private ObjectNode jsonNode; private static DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_INSTANT; @@ -21,12 +21,17 @@ { super(task, pageReader); this.objectMapper = objectMapper; } - ObjectNode getJsonNode() + @Override + public ObjectNode getRecord() { + if (isDeletion()) { + return null; + } + return jsonNode; } @Override void reset() @@ -36,9 +41,11 @@ } @Override public void booleanColumn(Column column) { + super.booleanColumn(column); + if (isIgnoreColumn(column)) { return; } if (pageReader.isNull(column)) {