package org.embulk.formatter.single_value; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import org.embulk.config.Config; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; import org.embulk.spi.FileOutput; import org.embulk.spi.FormatterPlugin; import org.embulk.spi.Page; import org.embulk.spi.PageOutput; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampFormatter; import org.embulk.spi.util.LineEncoder; import org.joda.time.DateTimeZone; import org.jruby.embed.ScriptingContainer; import org.msgpack.value.Value; public class SingleValueFormatterPlugin implements FormatterPlugin { public interface PluginTask extends Task, LineEncoder.EncoderTask, TimestampFormatter.Task { @Config("column_name") @ConfigDefault("null") public Optional getColumnName(); @Config("null_string") @ConfigDefault("\"\"") String getNullString(); @Config("timezone") @ConfigDefault("\"UTC\"") public String getTimezone(); @Config("timestamp_format") @ConfigDefault("\"%Y-%m-%d %H:%M:%S.%6N %z\"") public String getTimestampFormat(); } @Override public void transaction(ConfigSource config, Schema schema, FormatterPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); control.run(task.dump()); } private int getInputColumnIndex(Optional columnName, Schema inputSchema) { if (columnName.isPresent()) { return inputSchema.lookupColumn(columnName.get()).getIndex(); } return 0; // default is the first column } private Schema getOutputSchema(int inputColumnIndex, Schema inputSchema) { Column outputColumn = inputSchema.getColumn(inputColumnIndex); ImmutableList.Builder builder = ImmutableList.builder(); builder.add(outputColumn); return new Schema(builder.build()); } @Override public PageOutput open(final TaskSource taskSource, final Schema inputSchema, final FileOutput output) { final PluginTask task = taskSource.loadTask(PluginTask.class); final LineEncoder encoder = new LineEncoder(output, task); final String nullString = task.getNullString(); final int inputColumnIndex = getInputColumnIndex(task.getColumnName(), inputSchema); final Schema outputSchema = getOutputSchema(inputColumnIndex, inputSchema); final DateTimeZone timezone = DateTimeZone.forID(task.getTimezone()); final TimestampFormatter timestampFormatter = getTimestampFormatter(task.getTimestampFormat(), timezone); // create a file encoder.nextFile(); return new PageOutput() { private final PageReader pageReader = new PageReader(inputSchema); public void add(Page page) { pageReader.setPage(page); while (pageReader.nextRecord()) { outputSchema.visitColumns(new ColumnVisitor() { public void booleanColumn(Column column) { if (!pageReader.isNull(inputColumnIndex)) { addValue(Boolean.toString(pageReader.getBoolean(inputColumnIndex))); } else { addNullString(); } } public void longColumn(Column column) { if (!pageReader.isNull(inputColumnIndex)) { addValue(Long.toString(pageReader.getLong(inputColumnIndex))); } else { addNullString(); } } public void doubleColumn(Column column) { if (!pageReader.isNull(inputColumnIndex)) { addValue(Double.toString(pageReader.getDouble(inputColumnIndex))); } else { addNullString(); } } public void stringColumn(Column column) { if (!pageReader.isNull(inputColumnIndex)) { addValue(pageReader.getString(inputColumnIndex)); } else { addNullString(); } } public void timestampColumn(Column column) { if (!pageReader.isNull(inputColumnIndex)) { Timestamp value = pageReader.getTimestamp(inputColumnIndex); addValue(timestampFormatter.format(value)); } else { addNullString(); } } public void jsonColumn(Column column) { if (!pageReader.isNull(inputColumnIndex)) { Value value = pageReader.getJson(inputColumnIndex); addValue(value.toJson()); } else { addNullString(); } } private void addValue(String v) { encoder.addText(v); } private void addNullString() { encoder.addText(nullString); } }); encoder.addNewLine(); } } public void finish() { encoder.finish(); } public void close() { encoder.close(); } }; } private class TimestampFormatterTaskImpl implements TimestampFormatter.Task { private final DateTimeZone defaultTimeZone; private final String defaultTimestampFormat; public TimestampFormatterTaskImpl( DateTimeZone defaultTimeZone, String defaultTimestampFormat) { this.defaultTimeZone = defaultTimeZone; this.defaultTimestampFormat = defaultTimestampFormat; } @Override public DateTimeZone getDefaultTimeZone() { return this.defaultTimeZone; } @Override public String getDefaultTimestampFormat() { return this.defaultTimestampFormat; } @Override public ScriptingContainer getJRuby() { return null; } } private class TimestampFormatterColumnOptionImpl implements TimestampFormatter.TimestampColumnOption { private final Optional timeZone; private final Optional format; public TimestampFormatterColumnOptionImpl( Optional timeZone, Optional format) { this.timeZone = timeZone; this.format = format; } @Override public Optional getTimeZone() { return this.timeZone; } @Override public Optional getFormat() { return this.format; } } private TimestampFormatter getTimestampFormatter(String format, DateTimeZone timezone) { // ToDo: Use following codes after deciding to drop supporting embulk < 0.8.29. // // return new TimestampFormatter(format, timezone); TimestampFormatterTaskImpl task = new TimestampFormatterTaskImpl( timezone, format); TimestampFormatterColumnOptionImpl columnOption = new TimestampFormatterColumnOptionImpl( Optional.of(timezone), Optional.of(format)); return new TimestampFormatter(task, Optional.of(columnOption)); } }