src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.4.0 vs src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.4.1

- old
+ new

@@ -1,165 +1,165 @@ -package org.embulk.output; - -import java.util.List; -import java.util.Properties; -import java.io.IOException; -import java.sql.SQLException; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; -import org.embulk.output.jdbc.BatchInsert; -import org.embulk.output.postgresql.PostgreSQLOutputConnector; -import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert; - -import com.google.common.collect.ImmutableList; -import java.sql.Types; -import org.embulk.spi.Schema; -import org.embulk.spi.ColumnVisitor; -import org.embulk.spi.Column; -import org.embulk.output.jdbc.JdbcColumn; -import org.embulk.output.jdbc.JdbcSchema; - -public class PostgreSQLOutputPlugin - extends AbstractJdbcOutputPlugin -{ - public interface PostgreSQLPluginTask - extends PluginTask - { - @Config("host") - public String getHost(); - - @Config("port") - @ConfigDefault("5432") - public int getPort(); - - @Config("user") - public String getUser(); - - @Config("password") - @ConfigDefault("\"\"") - public String getPassword(); - - @Config("database") - public String getDatabase(); - - @Config("schema") - @ConfigDefault("\"public\"") - public String getSchema(); - } - - @Override - protected Class<? extends PluginTask> getTaskClass() - { - return PostgreSQLPluginTask.class; - } - - @Override - protected Features getFeatures(PluginTask task) - { - return new Features() - .setMaxTableNameLength(30) - .setSupportedModes(ImmutableSet.of(Mode.INSERT, Mode.INSERT_DIRECT, Mode.MERGE, Mode.TRUNCATE_INSERT, Mode.REPLACE)) - .setIgnoreMergeKeys(false); - } - - @Override - protected PostgreSQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) - { - PostgreSQLPluginTask t = (PostgreSQLPluginTask) task; - - String url = String.format("jdbc:postgresql://%s:%d/%s", - t.getHost(), t.getPort(), t.getDatabase()); - - Properties props = new Properties(); - props.setProperty("loginTimeout", "300"); // seconds - props.setProperty("socketTimeout", "1800"); // seconds - - // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. - // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. - props.setProperty("tcpKeepAlive", "true"); - - // TODO - //switch t.getSssl() { - //when "disable": - // break; - //when "enable": - // props.setProperty("sslfactory", "org.postgresql.ssl.NonValidatingFactory"); // disable server-side validation - //when "verify": - // props.setProperty("ssl", "true"); - // break; - //} - - if (!retryableMetadataOperation) { - // non-retryable batch operation uses longer timeout - props.setProperty("loginTimeout", "300"); // seconds - props.setProperty("socketTimeout", "28800"); // seconds - } - - props.putAll(t.getOptions()); - - props.setProperty("user", t.getUser()); - logger.info("Connecting to {} options {}", url, props); - props.setProperty("password", t.getPassword()); - - return new PostgreSQLOutputConnector(url, props, t.getSchema()); - } - - @Override - protected BatchInsert newBatchInsert(PluginTask task, Optional<List<String>> mergeKeys) throws IOException, SQLException - { - if (mergeKeys.isPresent()) { - throw new UnsupportedOperationException("PostgreSQL output plugin doesn't support 'merge_direct' mode. Use 'merge' mode instead."); - } - return new PostgreSQLCopyBatchInsert(getConnector(task, true)); - } - - // TODO This is almost copy from AbstractJdbcOutputPlugin excepting type of TIMESTAMP -> TIMESTAMP WITH TIME ZONE. - // AbstractJdbcOutputPlugin should have better extensibility. - @Override - protected JdbcSchema newJdbcSchemaForNewTable(Schema schema) - { - final ImmutableList.Builder<JdbcColumn> columns = ImmutableList.builder(); - for (Column c : schema.getColumns()) { - final String columnName = c.getName(); - c.visit(new ColumnVisitor() { - public void booleanColumn(Column column) - { - columns.add(JdbcColumn.newGenericTypeColumn( - columnName, Types.BOOLEAN, "BOOLEAN", - 1, 0, false, false)); - } - - public void longColumn(Column column) - { - columns.add(JdbcColumn.newGenericTypeColumn( - columnName, Types.BIGINT, "BIGINT", - 22, 0, false, false)); - } - - public void doubleColumn(Column column) - { - columns.add(JdbcColumn.newGenericTypeColumn( - columnName, Types.FLOAT, "DOUBLE PRECISION", - 24, 0, false, false)); - } - - public void stringColumn(Column column) - { - columns.add(JdbcColumn.newGenericTypeColumn( - columnName, Types.CLOB, "CLOB", - 4000, 0, false, false)); // TODO size type param - } - - public void timestampColumn(Column column) - { - columns.add(JdbcColumn.newGenericTypeColumn( - columnName, Types.TIMESTAMP, "TIMESTAMP WITH TIME ZONE", - 26, 0, false, false)); // size type param is from postgresql - } - }); - } - return new JdbcSchema(columns.build()); - } -} +package org.embulk.output; + +import java.util.List; +import java.util.Properties; +import java.io.IOException; +import java.sql.SQLException; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; +import org.embulk.output.jdbc.BatchInsert; +import org.embulk.output.postgresql.PostgreSQLOutputConnector; +import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert; + +import com.google.common.collect.ImmutableList; +import java.sql.Types; +import org.embulk.spi.Schema; +import org.embulk.spi.ColumnVisitor; +import org.embulk.spi.Column; +import org.embulk.output.jdbc.JdbcColumn; +import org.embulk.output.jdbc.JdbcSchema; + +public class PostgreSQLOutputPlugin + extends AbstractJdbcOutputPlugin +{ + public interface PostgreSQLPluginTask + extends PluginTask + { + @Config("host") + public String getHost(); + + @Config("port") + @ConfigDefault("5432") + public int getPort(); + + @Config("user") + public String getUser(); + + @Config("password") + @ConfigDefault("\"\"") + public String getPassword(); + + @Config("database") + public String getDatabase(); + + @Config("schema") + @ConfigDefault("\"public\"") + public String getSchema(); + } + + @Override + protected Class<? extends PluginTask> getTaskClass() + { + return PostgreSQLPluginTask.class; + } + + @Override + protected Features getFeatures(PluginTask task) + { + return new Features() + .setMaxTableNameLength(30) + .setSupportedModes(ImmutableSet.of(Mode.INSERT, Mode.INSERT_DIRECT, Mode.MERGE, Mode.TRUNCATE_INSERT, Mode.REPLACE)) + .setIgnoreMergeKeys(false); + } + + @Override + protected PostgreSQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) + { + PostgreSQLPluginTask t = (PostgreSQLPluginTask) task; + + String url = String.format("jdbc:postgresql://%s:%d/%s", + t.getHost(), t.getPort(), t.getDatabase()); + + Properties props = new Properties(); + props.setProperty("loginTimeout", "300"); // seconds + props.setProperty("socketTimeout", "1800"); // seconds + + // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. + // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. + props.setProperty("tcpKeepAlive", "true"); + + // TODO + //switch t.getSssl() { + //when "disable": + // break; + //when "enable": + // props.setProperty("sslfactory", "org.postgresql.ssl.NonValidatingFactory"); // disable server-side validation + //when "verify": + // props.setProperty("ssl", "true"); + // break; + //} + + if (!retryableMetadataOperation) { + // non-retryable batch operation uses longer timeout + props.setProperty("loginTimeout", "300"); // seconds + props.setProperty("socketTimeout", "28800"); // seconds + } + + props.putAll(t.getOptions()); + + props.setProperty("user", t.getUser()); + logger.info("Connecting to {} options {}", url, props); + props.setProperty("password", t.getPassword()); + + return new PostgreSQLOutputConnector(url, props, t.getSchema()); + } + + @Override + protected BatchInsert newBatchInsert(PluginTask task, Optional<List<String>> mergeKeys) throws IOException, SQLException + { + if (mergeKeys.isPresent()) { + throw new UnsupportedOperationException("PostgreSQL output plugin doesn't support 'merge_direct' mode. Use 'merge' mode instead."); + } + return new PostgreSQLCopyBatchInsert(getConnector(task, true)); + } + + // TODO This is almost copy from AbstractJdbcOutputPlugin excepting type of TIMESTAMP -> TIMESTAMP WITH TIME ZONE. + // AbstractJdbcOutputPlugin should have better extensibility. + @Override + protected JdbcSchema newJdbcSchemaForNewTable(Schema schema) + { + final ImmutableList.Builder<JdbcColumn> columns = ImmutableList.builder(); + for (Column c : schema.getColumns()) { + final String columnName = c.getName(); + c.visit(new ColumnVisitor() { + public void booleanColumn(Column column) + { + columns.add(JdbcColumn.newGenericTypeColumn( + columnName, Types.BOOLEAN, "BOOLEAN", + 1, 0, false, false)); + } + + public void longColumn(Column column) + { + columns.add(JdbcColumn.newGenericTypeColumn( + columnName, Types.BIGINT, "BIGINT", + 22, 0, false, false)); + } + + public void doubleColumn(Column column) + { + columns.add(JdbcColumn.newGenericTypeColumn( + columnName, Types.FLOAT, "DOUBLE PRECISION", + 24, 0, false, false)); + } + + public void stringColumn(Column column) + { + columns.add(JdbcColumn.newGenericTypeColumn( + columnName, Types.CLOB, "CLOB", + 4000, 0, false, false)); // TODO size type param + } + + public void timestampColumn(Column column) + { + columns.add(JdbcColumn.newGenericTypeColumn( + columnName, Types.TIMESTAMP, "TIMESTAMP WITH TIME ZONE", + 26, 0, false, false)); // size type param is from postgresql + } + }); + } + return new JdbcSchema(columns.build()); + } +}