src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.1.2 vs src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.2.0

- old
+ new

@@ -2,39 +2,71 @@ import java.util.Properties; import java.io.IOException; import java.sql.SQLException; import org.embulk.spi.Exec; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; import org.embulk.output.jdbc.BatchInsert; import org.embulk.output.postgresql.PostgreSQLOutputConnector; import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert; public class PostgreSQLOutputPlugin extends AbstractJdbcOutputPlugin { - private static final String DEFAULT_SCHEMA = "public"; - private static final int DEFAULT_PORT = 5432; + public interface PostgreSQLPluginTask + extends PluginTask + { + @Config("host") + public String getHost(); + @Config("port") + @ConfigDefault("5432") + public int getPort(); + + @Config("user") + public String getUser(); + + @Config("password") + @ConfigDefault("\"\"") + public String getPassword(); + + @Config("database") + public String getDatabase(); + + @Config("schema") + @ConfigDefault("\"public\"") + public String getSchema(); + } + @Override + protected Class<? extends PluginTask> getTaskClass() + { + return PostgreSQLPluginTask.class; + } + + @Override protected PostgreSQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) { + PostgreSQLPluginTask t = (PostgreSQLPluginTask) task; + String url = String.format("jdbc:postgresql://%s:%d/%s", - task.getHost(), task.getPort().or(DEFAULT_PORT), task.getDatabase()); + t.getHost(), t.getPort(), t.getDatabase()); Properties props = new Properties(); - props.setProperty("user", task.getUser()); - props.setProperty("password", task.getPassword()); + props.setProperty("user", t.getUser()); + props.setProperty("password", t.getPassword()); props.setProperty("loginTimeout", "300"); // seconds props.setProperty("socketTimeout", "1800"); // seconds // Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters. // Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable. props.setProperty("tcpKeepAlive", "true"); // TODO - //switch task.getSssl() { + //switch t.getSssl() { //when "disable": // break; //when "enable": // props.setProperty("sslfactory", "org.postgresql.ssl.NonValidatingFactory"); // disable server-side validation //when "verify": @@ -46,12 +78,12 @@ // non-retryable batch operation uses longer timeout props.setProperty("loginTimeout", "300"); // seconds props.setProperty("socketTimeout", "28800"); // seconds } - props.putAll(task.getOptions()); + props.putAll(t.getOptions()); - return new PostgreSQLOutputConnector(url, props, task.getSchema().or(DEFAULT_SCHEMA)); + return new PostgreSQLOutputConnector(url, props, t.getSchema()); } @Override protected BatchInsert newBatchInsert(PluginTask task) throws IOException, SQLException {