src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.7 vs src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.8
- old
+ new
@@ -8,11 +8,11 @@
import org.msgpack.value.Value;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
-public class JsonFormatColumnVisitor extends KafkaOutputColumnVisitor
+public class JsonFormatColumnVisitor extends KafkaOutputColumnVisitor<ObjectNode>
{
private ObjectMapper objectMapper;
private ObjectNode jsonNode;
private static DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_INSTANT;
@@ -21,12 +21,17 @@
{
super(task, pageReader);
this.objectMapper = objectMapper;
}
- ObjectNode getJsonNode()
+ @Override
+ public ObjectNode getRecord()
{
+ if (isDeletion()) {
+ return null;
+ }
+
return jsonNode;
}
@Override
void reset()
@@ -36,9 +41,11 @@
}
@Override
public void booleanColumn(Column column)
{
+ super.booleanColumn(column);
+
if (isIgnoreColumn(column)) {
return;
}
if (pageReader.isNull(column)) {