src/main/java/org/embulk/input/postgresql/getter/PostgreSQLColumnGetterFactory.java in embulk-input-postgresql-0.8.0 vs src/main/java/org/embulk/input/postgresql/getter/PostgreSQLColumnGetterFactory.java in embulk-input-postgresql-0.8.1

- old
+ new

@@ -1,36 +1,57 @@ package org.embulk.input.postgresql.getter; +import org.embulk.input.jdbc.AbstractJdbcInputPlugin.PluginTask; import org.embulk.input.jdbc.JdbcColumn; import org.embulk.input.jdbc.JdbcColumnOption; +import org.embulk.input.jdbc.JdbcInputConnection; import org.embulk.input.jdbc.getter.ColumnGetter; import org.embulk.input.jdbc.getter.ColumnGetterFactory; +import org.embulk.input.jdbc.getter.TimestampWithTimeZoneIncrementalHandler; +import org.embulk.input.jdbc.getter.TimestampWithoutTimeZoneIncrementalHandler; import org.embulk.spi.PageBuilder; +import org.embulk.spi.type.Types; import org.joda.time.DateTimeZone; public class PostgreSQLColumnGetterFactory extends ColumnGetterFactory { public PostgreSQLColumnGetterFactory(PageBuilder to, DateTimeZone defaultTimeZone) { super(to, defaultTimeZone); } @Override - public ColumnGetter newColumnGetter(JdbcColumn column, JdbcColumnOption option) + public ColumnGetter newColumnGetter(JdbcInputConnection con, PluginTask task, JdbcColumn column, JdbcColumnOption option) { - if (column.getTypeName().equals("hstore")) { - return new HstoreColumnGetter(to, getToType(option)); - } else { - return super.newColumnGetter(column, option); + if (column.getTypeName().equals("hstore") && getToType(option) == Types.JSON) { + // converting hstore to json needs a special handling + return new HstoreToJsonColumnGetter(to, Types.JSON); } + + ColumnGetter getter = super.newColumnGetter(con, task, column, option); + + // incremental loading wrapper + switch (column.getTypeName()) { + case "timestamptz": + return new TimestampWithTimeZoneIncrementalHandler(getter); + case "timestamp": + return new TimestampWithoutTimeZoneIncrementalHandler(getter); + default: + return getter; + } } @Override protected String sqlTypeToValueType(JdbcColumn column, int sqlType) { - if (column.getTypeName().equals("json") || column.getTypeName().equals("jsonb")) { + switch(column.getTypeName()) { + case "json": + case "jsonb": return "json"; - } else { + case "hstore": + // hstore is converted to string by default + return "string"; + default: return super.sqlTypeToValueType(column, sqlType); } } }