src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.2.4 vs src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.3.0

- old
+ new

@@ -1,142 +1,109 @@ -package org.embulk.output; - -import java.util.List; -import java.util.Properties; -import java.io.IOException; -import java.sql.SQLException; - -import org.embulk.output.jdbc.setter.ColumnSetter; -import org.embulk.output.postgresql.PostgresqlBatchUpsert; -import org.embulk.spi.Exec; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; -import org.embulk.output.jdbc.BatchInsert; -import org.embulk.output.postgresql.PostgreSQLOutputConnector; -import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert; -import org.embulk.spi.PageReader; - -public class PostgreSQLOutputPlugin - extends AbstractJdbcOutputPlugin -{ - public interface PostgreSQLPluginTask - extends PluginTask - { - @Config("host") - public String getHost(); - - @Config("port") - @ConfigDefault("5432") - public int getPort(); - - @Config("user") - public String getUser(); - - @Config("password") - @ConfigDefault("\"\"") - public String getPassword(); - - @Config("database") - public String getDatabase(); - - @Config("schema") - @ConfigDefault("\"public\"") - public String getSchema(); - } - - @Override - protected Class<? extends PluginTask> getTaskClass() - { - return PostgreSQLPluginTask.class; - } - - @Override - protected PostgreSQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) - { - PostgreSQLPluginTask t = (PostgreSQLPluginTask) task; - - String url = String.format("jdbc:postgresql://%s:%d/%s", - t.getHost(), t.getPort(), t.getDatabase()); - - Properties props = new Properties(); - props.setProperty("user", t.getUser()); - props.setProperty("password", t.getPassword()); - props.setProperty("loginTimeout", "300"); // seconds - props.setProperty("socketTimeout", "1800"); // seconds - - // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. - // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. - props.setProperty("tcpKeepAlive", "true"); - - // TODO - //switch t.getSssl() { - //when "disable": - // break; - //when "enable": - // props.setProperty("sslfactory", "org.postgresql.ssl.NonValidatingFactory"); // disable server-side validation - //when "verify": - // props.setProperty("ssl", "true"); - // break; - //} - - if (!retryableMetadataOperation) { - // non-retryable batch operation uses longer timeout - props.setProperty("loginTimeout", "300"); // seconds - props.setProperty("socketTimeout", "28800"); // seconds - } - - props.putAll(t.getOptions()); - - return new PostgreSQLOutputConnector(url, props, t.getSchema()); - } - - @Override - protected PluginPageOutput newPluginPageOutput(PageReader reader, - BatchInsert batch, List<ColumnSetter> columnSetters, - PluginTask task) - { - if (task.getMode().isMerge()) { - return new PostgresPluginPageOutput(reader, batch, columnSetters, task.getBatchSize()); - } - return super.newPluginPageOutput(reader, batch, columnSetters, task); - } - - public static class PostgresPluginPageOutput extends PluginPageOutput - { - - public PostgresPluginPageOutput(PageReader pageReader, BatchInsert batch, List<ColumnSetter> columnSetters, int batchSize) - { - super(pageReader, batch, columnSetters, batchSize); - } - - @Override - protected void handleColumnsSetters() - { - int size = columnSetters.size(); - for (int i=0; i < size; i++) { - ColumnSetter columnSetter = columnSetters.get(i); - if (!columnSetter.getColumn().isPrimaryKey()) { - columns.get(i).visit(columnSetter); - } - } - for (int i=0; i < size; i++) { - ColumnSetter columnSetter = columnSetters.get(i); - if (columnSetter.getColumn().isPrimaryKey()) { - columns.get(i).visit(columnSetter); - } - } - for (int i=0; i < size; i++) { - columns.get(i).visit(columnSetters.get(i)); - } - } - - } - - @Override - protected BatchInsert newBatchInsert(PluginTask task) throws IOException, SQLException - { - PostgreSQLOutputConnector connector = getConnector(task, true); - return task.getMode().isMerge() ? new PostgresqlBatchUpsert(connector) : - new PostgreSQLCopyBatchInsert(getConnector(task, true)); - } -} +package org.embulk.output; + +import java.util.List; +import java.util.Properties; +import java.io.IOException; +import java.sql.SQLException; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; +import org.embulk.output.jdbc.BatchInsert; +import org.embulk.output.postgresql.PostgreSQLOutputConnector; +import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert; + +public class PostgreSQLOutputPlugin + extends AbstractJdbcOutputPlugin +{ + public interface PostgreSQLPluginTask + extends PluginTask + { + @Config("host") + public String getHost(); + + @Config("port") + @ConfigDefault("5432") + public int getPort(); + + @Config("user") + public String getUser(); + + @Config("password") + @ConfigDefault("\"\"") + public String getPassword(); + + @Config("database") + public String getDatabase(); + + @Config("schema") + @ConfigDefault("\"public\"") + public String getSchema(); + } + + @Override + protected Class<? extends PluginTask> getTaskClass() + { + return PostgreSQLPluginTask.class; + } + + @Override + protected Features getFeatures(PluginTask task) + { + return new Features() + .setMaxTableNameLength(30) + .setSupportedModes(ImmutableSet.of(Mode.INSERT, Mode.INSERT_DIRECT, Mode.MERGE, Mode.TRUNCATE_INSERT, Mode.REPLACE)) + .setIgnoreMergeKeys(false); + } + + @Override + protected PostgreSQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) + { + PostgreSQLPluginTask t = (PostgreSQLPluginTask) task; + + String url = String.format("jdbc:postgresql://%s:%d/%s", + t.getHost(), t.getPort(), t.getDatabase()); + + Properties props = new Properties(); + props.setProperty("loginTimeout", "300"); // seconds + props.setProperty("socketTimeout", "1800"); // seconds + + // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. + // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. + props.setProperty("tcpKeepAlive", "true"); + + // TODO + //switch t.getSssl() { + //when "disable": + // break; + //when "enable": + // props.setProperty("sslfactory", "org.postgresql.ssl.NonValidatingFactory"); // disable server-side validation + //when "verify": + // props.setProperty("ssl", "true"); + // break; + //} + + if (!retryableMetadataOperation) { + // non-retryable batch operation uses longer timeout + props.setProperty("loginTimeout", "300"); // seconds + props.setProperty("socketTimeout", "28800"); // seconds + } + + props.putAll(t.getOptions()); + + props.setProperty("user", t.getUser()); + logger.info("Connecting to {} options {}", url, props); + props.setProperty("password", t.getPassword()); + + return new PostgreSQLOutputConnector(url, props, t.getSchema()); + } + + @Override + protected BatchInsert newBatchInsert(PluginTask task, Optional<List<String>> mergeKeys) throws IOException, SQLException + { + if (mergeKeys.isPresent()) { + throw new UnsupportedOperationException("PostgreSQL output plugin doesn't support 'merge_direct' mode. Use 'merge' mode instead."); + } + return new PostgreSQLCopyBatchInsert(getConnector(task, true)); + } +}