src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.3 vs src/main/java/org/embulk/output/kafka/JsonFormatColumnVisitor.java in embulk-output-kafka-0.1.4
- old
+ new
@@ -11,18 +11,29 @@
import java.time.format.DateTimeFormatter;
public class JsonFormatColumnVisitor extends KafkaOutputColumnVisitor
{
private ObjectMapper objectMapper;
- public ObjectNode jsonNode;
+ private ObjectNode jsonNode;
private static DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_INSTANT;
- public JsonFormatColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader, ObjectMapper objectMapper)
+ JsonFormatColumnVisitor(KafkaOutputPlugin.PluginTask task, PageReader pageReader, ObjectMapper objectMapper)
{
super(task, pageReader);
this.objectMapper = objectMapper;
+ }
+
+ ObjectNode getJsonNode()
+ {
+ return jsonNode;
+ }
+
+ @Override
+ void reset()
+ {
+ super.reset();
this.jsonNode = objectMapper.createObjectNode();
}
@Override
public void booleanColumn(Column column)
@@ -40,10 +51,12 @@
}
@Override
public void longColumn(Column column)
{
+ super.longColumn(column);
+
if (isIgnoreColumn(column)) {
return;
}
if (pageReader.isNull(column)) {
@@ -56,36 +69,38 @@
}
@Override
public void doubleColumn(Column column)
{
+ super.doubleColumn(column);
+
if (isIgnoreColumn(column)) {
return;
}
if (pageReader.isNull(column)) {
jsonNode.putNull(column.getName());
return;
}
jsonNode.put(column.getName(), pageReader.getDouble(column));
- super.doubleColumn(column);
}
@Override
public void stringColumn(Column column)
{
+ super.stringColumn(column);
+
if (isIgnoreColumn(column)) {
return;
}
if (pageReader.isNull(column)) {
jsonNode.putNull(column.getName());
return;
}
jsonNode.put(column.getName(), pageReader.getString(column));
- super.stringColumn(column);
}
@Override
public void timestampColumn(Column column)
{