src/main/java/org/embulk/output/SalesforceOutputPlugin.java in embulk-output-salesforce-0.1.2 vs src/main/java/org/embulk/output/SalesforceOutputPlugin.java in embulk-output-salesforce-0.1.3
- old
+ new
@@ -1,533 +1,533 @@
-package org.embulk.output;
-
-import com.google.common.base.Optional;
-import com.sforce.soap.partner.Connector;
-import com.sforce.soap.partner.DeleteResult;
-import com.sforce.soap.partner.DescribeSObjectResult;
-import com.sforce.soap.partner.Field;
-import com.sforce.soap.partner.FieldType;
-import com.sforce.soap.partner.GetUserInfoResult;
-import com.sforce.soap.partner.PartnerConnection;
-import com.sforce.soap.partner.SaveResult;
-import com.sforce.soap.partner.UpsertResult;
-import com.sforce.soap.partner.fault.ApiFault;
-import com.sforce.soap.partner.sobject.SObject;
-import com.sforce.ws.ConnectionException;
-import com.sforce.ws.ConnectorConfig;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.embulk.config.CommitReport;
-import org.embulk.config.Config;
-import org.embulk.config.ConfigDefault;
-import org.embulk.config.ConfigDiff;
-import org.embulk.config.ConfigException;
-import org.embulk.config.ConfigSource;
-import org.embulk.config.Task;
-import org.embulk.config.TaskSource;
-import org.embulk.spi.Column;
-import org.embulk.spi.ColumnVisitor;
-import org.embulk.spi.Exec;
-import org.embulk.spi.OutputPlugin;
-import org.embulk.spi.Page;
-import org.embulk.spi.PageReader;
-import org.embulk.spi.Schema;
-import org.embulk.spi.TransactionalPageOutput;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.supercsv.cellprocessor.ift.CellProcessor;
-import org.supercsv.io.CsvListWriter;
-import org.supercsv.io.ICsvListWriter;
-import org.supercsv.prefs.CsvPreference;
-
-public class SalesforceOutputPlugin
- implements OutputPlugin
-{
- protected static Logger logger;
- private static PartnerConnection client = null;
- private static Map<String, String> externalIdToObjectNameMap = null;
-
- public interface PluginTask
- extends Task
- {
- @Config("username")
- public String getUsername();
-
- @Config("password")
- public String getPassword();
-
- @Config("login_endpoint")
- @ConfigDefault("\"https://login.salesforce.com\"")
- public Optional<String> getLoginEndpoint();
-
- @Config("sobject")
- public String getSObject();
-
- @Config("upsert_key")
- @ConfigDefault("null")
- public Optional<String> getUpsertKey();
-
- @Config("batch_size")
- @ConfigDefault("200")
- public Integer getBatchSize();
-
- @Config("action")
- @ConfigDefault("\"insert\"")
- public Optional<String> getAction();
-
- @Config("version")
- @ConfigDefault("34.0")
- public Optional<String> getVersion();
-
- @Config("result_dir")
- @ConfigDefault("null")
- public Optional<String> getResultDir();
- }
-
- @Override
- public ConfigDiff transaction(ConfigSource config,
- Schema schema, int taskCount,
- OutputPlugin.Control control)
- {
- PluginTask task = config.loadConfig(PluginTask.class);
- logger = Exec.getLogger(getClass());
-
- if (task.getResultDir().isPresent() && task.getResultDir().get() != null) {
- File resultDir = new File(task.getResultDir().get());
- if (!resultDir.exists() || !resultDir.isDirectory()) {
- logger.error("{} is not exist or is not directory.", task.getResultDir().get());
- throw new RuntimeException(task.getResultDir().get() + " is not exist or is not directory.");
- }
- }
-
- final String username = task.getUsername();
- final String password = task.getPassword();
- final String loginEndpoint = task.getLoginEndpoint().get();
- try {
- if (client == null) {
- ConnectorConfig connectorConfig = new ConnectorConfig();
- connectorConfig.setUsername(username);
- connectorConfig.setPassword(password);
- connectorConfig.setAuthEndpoint(loginEndpoint + "/services/Soap/u/" +task.getVersion().get() + "/");
-
- client = Connector.newConnection(connectorConfig);
- GetUserInfoResult userInfo = client.getUserInfo();
- logger.info("login successful with {}", userInfo.getUserName());
- externalIdToObjectNameMap = new HashMap<>();
- DescribeSObjectResult describeResult = client.describeSObject(task.getSObject());
- for (Field field : describeResult.getFields()) {
- if (field.getType() == FieldType.reference) {
- externalIdToObjectNameMap.put(field.getRelationshipName(), field.getReferenceTo()[0]);
- }
- }
- }
- } catch(ConnectionException ex) {
- logger.error("Login error. Please check your credentials.");
- throw new RuntimeException(ex);
- }
-
- control.run(task.dump());
- return Exec.newConfigDiff();
- }
-
- @Override
- public ConfigDiff resume(TaskSource taskSource,
- Schema schema, int taskCount,
- OutputPlugin.Control control)
- {
- throw new UnsupportedOperationException("salesforce output plugin does not support resuming");
- }
-
- @Override
- public void cleanup(TaskSource taskSource,
- Schema schema, int taskCount,
- List<CommitReport> successCommitReports)
- {
- logger.info("logout");
- try {
- if (client != null) {
- client.logout();
- }
- } catch (ConnectionException ex) {}
- }
-
- @Override
- public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex)
- {
- PluginTask task = taskSource.loadTask(PluginTask.class);
-
- PageReader reader = new PageReader(schema);
- return new SalesforcePageOutput(reader, client, task);
- }
-
- public class SalesforcePageOutput
- implements TransactionalPageOutput
- {
- private final String dateSuffix = new SimpleDateFormat("yyyyMMddhhmmssSSS").format(new Date());
-
- private final PageReader pageReader;
- private final PartnerConnection client;
- private final int batchSize;
- private List<SObject> records;
- private final String upsertKey;
- private final String sobject;
- private final String action;
- private final String resultDir;
- private Integer numOfSuccess = 0;
- private Integer numOfError = 0;
-
- public SalesforcePageOutput(final PageReader pageReader,
- PartnerConnection client, PluginTask task)
- {
- this.pageReader = pageReader;
- this.client = client;
- this.batchSize = task.getBatchSize();
- this.upsertKey = task.getUpsertKey().isPresent() ? task.getUpsertKey().get() : null;
- this.sobject = task.getSObject();
- this.action = task.getAction().isPresent() ? task.getAction().get() : null;
- this.resultDir = task.getResultDir().isPresent() ? task.getResultDir().get() : null;
- this.records = new ArrayList<>();
- }
-
- @Override
- public void add(Page page)
- {
- try {
- pageReader.setPage(page);
- while (pageReader.nextRecord()) {
- final SObject record = new SObject();
- record.setType(this.sobject);
-
- pageReader.getSchema().visitColumns(new ColumnVisitor() {
- @Override
- public void doubleColumn(Column column) {
- columnWithReferenceCheck(column.getName(), pageReader.getDouble(column));
- }
- @Override
- public void timestampColumn(Column column) {
- DateTime dt = new DateTime(pageReader.getTimestamp(column).getEpochSecond()*1000);
- Calendar cal = Calendar.getInstance();
- cal.clear();
- cal.setTimeZone(dt.getZone().toTimeZone());
- cal.set(dt.getYear(), dt.getMonthOfYear()-1, dt.getDayOfMonth(),
- dt.getHourOfDay(), dt.getMinuteOfHour(), dt.getSecondOfMinute());
- record.addField(column.getName(), cal);
- }
- @Override
- public void stringColumn(Column column) {
- columnWithReferenceCheck(column.getName(), pageReader.getString(column));
- }
- @Override
- public void longColumn(Column column) {
- columnWithReferenceCheck(column.getName(), pageReader.getLong(column));
- }
- @Override
- public void booleanColumn(Column column) {
- record.addField(column.getName(), pageReader.getBoolean(column));
- }
-
- private void columnWithReferenceCheck(String name, Object value) {
- if (name.indexOf('.') > 0) {
- String[] tokens = name.split("\\.");
- String referencesFieldName = tokens[0];
- String externalIdFieldName = tokens[1];
-
- SObject sObjRef = new SObject();
- String refFieldApiName = referencesFieldName.replaceAll("__R", "__r");
- if (externalIdToObjectNameMap.containsKey(refFieldApiName)) {
- sObjRef.setType(externalIdToObjectNameMap.get(refFieldApiName));
- } else {
- throw new ConfigException("Invalid Relationship Name '" + refFieldApiName + "'");
- }
- sObjRef.addField(externalIdFieldName, value);
- record.addField(referencesFieldName, sObjRef);
- } else {
- record.addField(name, value);
- }
- }
-
- });
- this.records.add(record);
-
- if (this.records.size() >= this.batchSize) {
- this.action(this.records);
- logger.info("Number of processed records: {}", this.numOfSuccess + this.numOfError);
- }
- }
-
- if (!this.records.isEmpty()) {
- this.action(this.records);
- logger.info("Number of processed records: {}", this.numOfSuccess + this.numOfError);
- }
- } catch (ConfigException ex) {
- logger.error("Configuration Error: {}", ex.getMessage());
- } catch (ApiFault ex) {
- logger.error("API Error: {}", ex.getExceptionMessage());
- } catch (ConnectionException ex) {
- logger.error("Connection Error: {}", ex.getMessage());
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void finish()
- {
-
- }
-
- @Override
- public void close()
- {
-
- }
-
- @Override
- public void abort()
- {
- }
-
- @Override
- public CommitReport commit()
- {
- return Exec.newCommitReport();
- }
-
- private void action(List<SObject> records) throws ConnectionException, IOException{
- switch(this.action){
- case "insert":
- SaveResult[] insertResults = client.create(
- (SObject[])this.records.toArray(new SObject[0]));
- countSaveResults(insertResults);
- if (resultDir != null) {
- ResultWrapper resultWrapper = new ResultWrapper(insertResults, null, null);
- createResultsFiles(records, resultWrapper);
- }
- break;
- case "upsert":
- UpsertResult[] upsertResults = client.upsert(
- this.upsertKey, (SObject[])this.records.toArray(new SObject[0]));
- countUpsertResults(upsertResults);
- if (resultDir != null) {
- ResultWrapper resultWrapper = new ResultWrapper(null, upsertResults, null);
- createResultsFiles(records, resultWrapper);
- }
- break;
- case "update":
- SaveResult[] updateResults = client.update(
- (SObject[])this.records.toArray(new SObject[0]));
- countSaveResults(updateResults);
- if (resultDir != null) {
- ResultWrapper resultWrapper = new ResultWrapper(updateResults, null, null);
- createResultsFiles(records, resultWrapper);
- }
- break;
- case "delete":
- List<String> ids = new ArrayList<>();
- for (SObject sobj : this.records) {
- ids.add(sobj.getId());
- }
- DeleteResult[] deleteResults = client.delete(ids.toArray(new String[0]));
- countDeleteResults(deleteResults);
- if (resultDir != null) {
- ResultWrapper resultWrapper = new ResultWrapper(null, null, deleteResults);
- createResultsFiles(records, resultWrapper);
- }
- break;
- }
- this.records = new ArrayList<>();
- }
-
- private void countSaveResults(SaveResult[] saveResults) {
- for (SaveResult saveResult : saveResults) {
- if (saveResult.isSuccess()) {
- this.numOfSuccess++;
- } else {
- this.numOfError++;
- }
- }
- }
-
- private void countUpsertResults(UpsertResult[] upsertResults) {
- for (UpsertResult upsertResult : upsertResults) {
- if (upsertResult.isSuccess()) {
- this.numOfSuccess++;
- } else {
- this.numOfError++;
- }
- }
- }
-
- private void countDeleteResults(DeleteResult[] deleteResults) {
- for (DeleteResult deleteResult : deleteResults) {
- if (deleteResult.isSuccess()) {
- this.numOfSuccess++;
- } else {
- this.numOfError++;
- }
- }
- }
-
- private List<String> createSuccessHeader() {
- final List<String> successHeader = new ArrayList<>();
- successHeader.add("Id");
- for (Column col : pageReader.getSchema().getColumns()) {
- successHeader.add(col.getName());
- }
- return successHeader;
- }
-
- private List<String> createErrorHeader() {
- final List<String> errorHeader = new ArrayList<>();
- for (Column col : pageReader.getSchema().getColumns()) {
- errorHeader.add(col.getName());
- }
- errorHeader.add("Error");
- return errorHeader;
- }
-
- private void createResultsFiles(List<SObject> records, ResultWrapper resultWrapper) throws IOException{
- ICsvListWriter successListWriter = null;
- ICsvListWriter errorListWriter = null;
- try {
- String successFileName = this.resultDir + "/success_" + dateSuffix + ".csv";
- Boolean isExistSuccessFile = new File(successFileName).exists();
- successListWriter = new CsvListWriter(new FileWriter(successFileName, true),
- CsvPreference.STANDARD_PREFERENCE);
- if (!isExistSuccessFile) {
- successListWriter.write(createSuccessHeader());
- }
-
- String errorFileName = this.resultDir + "/error_" + dateSuffix + ".csv";
- Boolean isExistErrorFile = new File(errorFileName).exists();
- errorListWriter = new CsvListWriter(new FileWriter(errorFileName, true),
- CsvPreference.STANDARD_PREFERENCE);
- if (!isExistErrorFile) {
- errorListWriter.write(createErrorHeader());
- }
-
- CellProcessor[] processors = new CellProcessor[pageReader.getSchema().getColumns().size() + 1];
- ArrayList<ArrayList<String>> errorValues = new ArrayList<>();
- for (Integer i = 0, imax = records.size(); i < imax; i++) {
- SObject record = records.get(i);
-
- List<String> values = new ArrayList<>();
- if (resultWrapper.getIsSuccess(i)) {
- values.add(resultWrapper.getId(i));
- }
- for (Column col : pageReader.getSchema().getColumns()) {
- Object obj = record.getSObjectField(col.getName());
- if (obj != null) {
- if (obj instanceof Calendar) {
- DateTime dt = new DateTime((Calendar)obj);
- values.add(dt.toString("yyyy-MM-dd'T'hh:mm:ssZZ"));
- } else {
- values.add(obj.toString());
- }
- } else {
- values.add("");
- }
- }
- if (!resultWrapper.getIsSuccess(i)) {
- StringBuilder sb = new StringBuilder();
- for (com.sforce.soap.partner.Error err : resultWrapper.getErrors(i)) {
- if (sb.length() > 0) {
- sb.append(";");
- }
- sb.append(err.getStatusCode())
- .append(":")
- .append(err.getMessage());
- }
-
- values.add(sb.toString());
- }
- if (resultWrapper.getIsSuccess(i)) {
- successListWriter.write(values);
- } else {
- errorListWriter.write(values);
- }
- }
- } finally {
- if(successListWriter != null ) {
- successListWriter.close();
- }
- if(errorListWriter != null ) {
- errorListWriter.close();
- }
- }
- }
- }
-
- public class ResultWrapper {
- private final SaveResult[] saveResult;
- private final UpsertResult[] upsertResult;
- private final DeleteResult[] deleteResult;
-
- public ResultWrapper(SaveResult[] saveResults,
- UpsertResult[] upsertResults, DeleteResult[] deleteResults) {
- this.saveResult = saveResults;
- this.upsertResult = upsertResults;
- this.deleteResult = deleteResults;
- }
-
- public Boolean isSaveResult() {
- return this.saveResult != null;
- }
-
- public Boolean isUpsertResult() {
- return this.upsertResult != null;
- }
-
- public Boolean isDeleteResult() {
- return this.deleteResult != null;
- }
-
- public String getId(Integer index) {
- if (this.isSaveResult()) {
- return this.saveResult[index].getId();
- } else if (this.isUpsertResult()) {
- return this.upsertResult[index].getId();
- } else if (this.isDeleteResult()) {
- return this.deleteResult[index].getId();
- }
- return null;
- }
-
- public Boolean getIsSuccess(Integer index) {
- if (this.isSaveResult()) {
- return this.saveResult[index].isSuccess();
- } else if (this.isUpsertResult()) {
- return this.upsertResult[index].isSuccess();
- } else if (this.isDeleteResult()) {
- return this.deleteResult[index].isSuccess();
- }
- return null;
- }
-
- public Boolean getIsCreated(Integer index) {
- if (this.isUpsertResult()) {
- return this.upsertResult[index].isCreated();
- }
- return null;
- }
-
- public com.sforce.soap.partner.Error[] getErrors(Integer index) {
- if (this.isSaveResult()) {
- return this.saveResult[index].getErrors();
- } else if (this.isUpsertResult()) {
- return this.upsertResult[index].getErrors();
- } else if (this.isDeleteResult()) {
- return this.deleteResult[index].getErrors();
- }
- return null;
- }
- }
-
-}
+package org.embulk.output;
+
+import com.google.common.base.Optional;
+import com.sforce.soap.partner.Connector;
+import com.sforce.soap.partner.DeleteResult;
+import com.sforce.soap.partner.DescribeSObjectResult;
+import com.sforce.soap.partner.Field;
+import com.sforce.soap.partner.FieldType;
+import com.sforce.soap.partner.GetUserInfoResult;
+import com.sforce.soap.partner.PartnerConnection;
+import com.sforce.soap.partner.SaveResult;
+import com.sforce.soap.partner.UpsertResult;
+import com.sforce.soap.partner.fault.ApiFault;
+import com.sforce.soap.partner.sobject.SObject;
+import com.sforce.ws.ConnectionException;
+import com.sforce.ws.ConnectorConfig;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.embulk.config.TaskReport;
+import org.embulk.config.Config;
+import org.embulk.config.ConfigDefault;
+import org.embulk.config.ConfigDiff;
+import org.embulk.config.ConfigException;
+import org.embulk.config.ConfigSource;
+import org.embulk.config.Task;
+import org.embulk.config.TaskSource;
+import org.embulk.spi.Column;
+import org.embulk.spi.ColumnVisitor;
+import org.embulk.spi.Exec;
+import org.embulk.spi.OutputPlugin;
+import org.embulk.spi.Page;
+import org.embulk.spi.PageReader;
+import org.embulk.spi.Schema;
+import org.embulk.spi.TransactionalPageOutput;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvListWriter;
+import org.supercsv.io.ICsvListWriter;
+import org.supercsv.prefs.CsvPreference;
+
+public class SalesforceOutputPlugin
+ implements OutputPlugin
+{
+ protected static Logger logger;
+ private static PartnerConnection client = null;
+ private static Map<String, String> externalIdToObjectNameMap = null;
+
+ public interface PluginTask
+ extends Task
+ {
+ @Config("username")
+ public String getUsername();
+
+ @Config("password")
+ public String getPassword();
+
+ @Config("login_endpoint")
+ @ConfigDefault("\"https://login.salesforce.com\"")
+ public Optional<String> getLoginEndpoint();
+
+ @Config("sobject")
+ public String getSObject();
+
+ @Config("upsert_key")
+ @ConfigDefault("null")
+ public Optional<String> getUpsertKey();
+
+ @Config("batch_size")
+ @ConfigDefault("200")
+ public Integer getBatchSize();
+
+ @Config("action")
+ @ConfigDefault("\"insert\"")
+ public Optional<String> getAction();
+
+ @Config("version")
+ @ConfigDefault("34.0")
+ public Optional<String> getVersion();
+
+ @Config("result_dir")
+ @ConfigDefault("null")
+ public Optional<String> getResultDir();
+ }
+
+ @Override
+ public ConfigDiff transaction(ConfigSource config,
+ Schema schema, int taskCount,
+ OutputPlugin.Control control)
+ {
+ PluginTask task = config.loadConfig(PluginTask.class);
+ logger = Exec.getLogger(getClass());
+
+ if (task.getResultDir().isPresent() && task.getResultDir().get() != null) {
+ File resultDir = new File(task.getResultDir().get());
+ if (!resultDir.exists() || !resultDir.isDirectory()) {
+ logger.error("{} is not exist or is not directory.", task.getResultDir().get());
+ throw new RuntimeException(task.getResultDir().get() + " is not exist or is not directory.");
+ }
+ }
+
+ final String username = task.getUsername();
+ final String password = task.getPassword();
+ final String loginEndpoint = task.getLoginEndpoint().get();
+ try {
+ if (client == null) {
+ ConnectorConfig connectorConfig = new ConnectorConfig();
+ connectorConfig.setUsername(username);
+ connectorConfig.setPassword(password);
+ connectorConfig.setAuthEndpoint(loginEndpoint + "/services/Soap/u/" +task.getVersion().get() + "/");
+
+ client = Connector.newConnection(connectorConfig);
+ GetUserInfoResult userInfo = client.getUserInfo();
+ logger.info("login successful with {}", userInfo.getUserName());
+ externalIdToObjectNameMap = new HashMap<>();
+ DescribeSObjectResult describeResult = client.describeSObject(task.getSObject());
+ for (Field field : describeResult.getFields()) {
+ if (field.getType() == FieldType.reference) {
+ externalIdToObjectNameMap.put(field.getRelationshipName(), field.getReferenceTo()[0]);
+ }
+ }
+ }
+ } catch(ConnectionException ex) {
+ logger.error("Login error. Please check your credentials.");
+ throw new RuntimeException(ex);
+ }
+
+ control.run(task.dump());
+ return Exec.newConfigDiff();
+ }
+
+ @Override
+ public ConfigDiff resume(TaskSource taskSource,
+ Schema schema, int taskCount,
+ OutputPlugin.Control control)
+ {
+ throw new UnsupportedOperationException("salesforce output plugin does not support resuming");
+ }
+
+ @Override
+ public void cleanup(TaskSource taskSource,
+ Schema schema, int taskCount,
+ List<TaskReport> successTaskReports)
+ {
+ logger.info("logout");
+ try {
+ if (client != null) {
+ client.logout();
+ }
+ } catch (ConnectionException ex) {}
+ }
+
+ @Override
+ public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex)
+ {
+ PluginTask task = taskSource.loadTask(PluginTask.class);
+
+ PageReader reader = new PageReader(schema);
+ return new SalesforcePageOutput(reader, client, task);
+ }
+
+ public class SalesforcePageOutput
+ implements TransactionalPageOutput
+ {
+ private final String dateSuffix = new SimpleDateFormat("yyyyMMddhhmmssSSS").format(new Date());
+
+ private final PageReader pageReader;
+ private final PartnerConnection client;
+ private final int batchSize;
+ private List<SObject> records;
+ private final String upsertKey;
+ private final String sobject;
+ private final String action;
+ private final String resultDir;
+ private Integer numOfSuccess = 0;
+ private Integer numOfError = 0;
+
+ public SalesforcePageOutput(final PageReader pageReader,
+ PartnerConnection client, PluginTask task)
+ {
+ this.pageReader = pageReader;
+ this.client = client;
+ this.batchSize = task.getBatchSize();
+ this.upsertKey = task.getUpsertKey().isPresent() ? task.getUpsertKey().get() : null;
+ this.sobject = task.getSObject();
+ this.action = task.getAction().isPresent() ? task.getAction().get() : null;
+ this.resultDir = task.getResultDir().isPresent() ? task.getResultDir().get() : null;
+ this.records = new ArrayList<>();
+ }
+
+ @Override
+ public void add(Page page)
+ {
+ try {
+ pageReader.setPage(page);
+ while (pageReader.nextRecord()) {
+ final SObject record = new SObject();
+ record.setType(this.sobject);
+
+ pageReader.getSchema().visitColumns(new ColumnVisitor() {
+ @Override
+ public void doubleColumn(Column column) {
+ columnWithReferenceCheck(column.getName(), pageReader.getDouble(column));
+ }
+ @Override
+ public void timestampColumn(Column column) {
+ DateTime dt = new DateTime(pageReader.getTimestamp(column).getEpochSecond()*1000);
+ Calendar cal = Calendar.getInstance();
+ cal.clear();
+ cal.setTimeZone(dt.getZone().toTimeZone());
+ cal.set(dt.getYear(), dt.getMonthOfYear()-1, dt.getDayOfMonth(),
+ dt.getHourOfDay(), dt.getMinuteOfHour(), dt.getSecondOfMinute());
+ record.addField(column.getName(), cal);
+ }
+ @Override
+ public void stringColumn(Column column) {
+ columnWithReferenceCheck(column.getName(), pageReader.getString(column));
+ }
+ @Override
+ public void longColumn(Column column) {
+ columnWithReferenceCheck(column.getName(), pageReader.getLong(column));
+ }
+ @Override
+ public void booleanColumn(Column column) {
+ record.addField(column.getName(), pageReader.getBoolean(column));
+ }
+
+ private void columnWithReferenceCheck(String name, Object value) {
+ if (name.indexOf('.') > 0) {
+ String[] tokens = name.split("\\.");
+ String referencesFieldName = tokens[0];
+ String externalIdFieldName = tokens[1];
+
+ SObject sObjRef = new SObject();
+ String refFieldApiName = referencesFieldName.replaceAll("__R", "__r");
+ if (externalIdToObjectNameMap.containsKey(refFieldApiName)) {
+ sObjRef.setType(externalIdToObjectNameMap.get(refFieldApiName));
+ } else {
+ throw new ConfigException("Invalid Relationship Name '" + refFieldApiName + "'");
+ }
+ sObjRef.addField(externalIdFieldName, value);
+ record.addField(referencesFieldName, sObjRef);
+ } else {
+ record.addField(name, value);
+ }
+ }
+
+ });
+ this.records.add(record);
+
+ if (this.records.size() >= this.batchSize) {
+ this.action(this.records);
+ logger.info("Number of processed records: {}", this.numOfSuccess + this.numOfError);
+ }
+ }
+
+ if (!this.records.isEmpty()) {
+ this.action(this.records);
+ logger.info("Number of processed records: {}", this.numOfSuccess + this.numOfError);
+ }
+ } catch (ConfigException ex) {
+ logger.error("Configuration Error: {}", ex.getMessage());
+ } catch (ApiFault ex) {
+ logger.error("API Error: {}", ex.getExceptionMessage());
+ } catch (ConnectionException ex) {
+ logger.error("Connection Error: {}", ex.getMessage());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void finish()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void abort()
+ {
+ }
+
+ @Override
+ public TaskReport commit()
+ {
+ return Exec.newTaskReport();
+ }
+
+ private void action(List<SObject> records) throws ConnectionException, IOException{
+ switch(this.action){
+ case "insert":
+ SaveResult[] insertResults = client.create(
+ (SObject[])this.records.toArray(new SObject[0]));
+ countSaveResults(insertResults);
+ if (resultDir != null) {
+ ResultWrapper resultWrapper = new ResultWrapper(insertResults, null, null);
+ createResultsFiles(records, resultWrapper);
+ }
+ break;
+ case "upsert":
+ UpsertResult[] upsertResults = client.upsert(
+ this.upsertKey, (SObject[])this.records.toArray(new SObject[0]));
+ countUpsertResults(upsertResults);
+ if (resultDir != null) {
+ ResultWrapper resultWrapper = new ResultWrapper(null, upsertResults, null);
+ createResultsFiles(records, resultWrapper);
+ }
+ break;
+ case "update":
+ SaveResult[] updateResults = client.update(
+ (SObject[])this.records.toArray(new SObject[0]));
+ countSaveResults(updateResults);
+ if (resultDir != null) {
+ ResultWrapper resultWrapper = new ResultWrapper(updateResults, null, null);
+ createResultsFiles(records, resultWrapper);
+ }
+ break;
+ case "delete":
+ List<String> ids = new ArrayList<>();
+ for (SObject sobj : this.records) {
+ ids.add(sobj.getId());
+ }
+ DeleteResult[] deleteResults = client.delete(ids.toArray(new String[0]));
+ countDeleteResults(deleteResults);
+ if (resultDir != null) {
+ ResultWrapper resultWrapper = new ResultWrapper(null, null, deleteResults);
+ createResultsFiles(records, resultWrapper);
+ }
+ break;
+ }
+ this.records = new ArrayList<>();
+ }
+
+ private void countSaveResults(SaveResult[] saveResults) {
+ for (SaveResult saveResult : saveResults) {
+ if (saveResult.isSuccess()) {
+ this.numOfSuccess++;
+ } else {
+ this.numOfError++;
+ }
+ }
+ }
+
+ private void countUpsertResults(UpsertResult[] upsertResults) {
+ for (UpsertResult upsertResult : upsertResults) {
+ if (upsertResult.isSuccess()) {
+ this.numOfSuccess++;
+ } else {
+ this.numOfError++;
+ }
+ }
+ }
+
+ private void countDeleteResults(DeleteResult[] deleteResults) {
+ for (DeleteResult deleteResult : deleteResults) {
+ if (deleteResult.isSuccess()) {
+ this.numOfSuccess++;
+ } else {
+ this.numOfError++;
+ }
+ }
+ }
+
+ private List<String> createSuccessHeader() {
+ final List<String> successHeader = new ArrayList<>();
+ successHeader.add("Id");
+ for (Column col : pageReader.getSchema().getColumns()) {
+ successHeader.add(col.getName());
+ }
+ return successHeader;
+ }
+
+ private List<String> createErrorHeader() {
+ final List<String> errorHeader = new ArrayList<>();
+ for (Column col : pageReader.getSchema().getColumns()) {
+ errorHeader.add(col.getName());
+ }
+ errorHeader.add("Error");
+ return errorHeader;
+ }
+
+ private void createResultsFiles(List<SObject> records, ResultWrapper resultWrapper) throws IOException{
+ ICsvListWriter successListWriter = null;
+ ICsvListWriter errorListWriter = null;
+ try {
+ String successFileName = this.resultDir + "/success_" + dateSuffix + ".csv";
+ Boolean isExistSuccessFile = new File(successFileName).exists();
+ successListWriter = new CsvListWriter(new FileWriter(successFileName, true),
+ CsvPreference.STANDARD_PREFERENCE);
+ if (!isExistSuccessFile) {
+ successListWriter.write(createSuccessHeader());
+ }
+
+ String errorFileName = this.resultDir + "/error_" + dateSuffix + ".csv";
+ Boolean isExistErrorFile = new File(errorFileName).exists();
+ errorListWriter = new CsvListWriter(new FileWriter(errorFileName, true),
+ CsvPreference.STANDARD_PREFERENCE);
+ if (!isExistErrorFile) {
+ errorListWriter.write(createErrorHeader());
+ }
+
+ CellProcessor[] processors = new CellProcessor[pageReader.getSchema().getColumns().size() + 1];
+ ArrayList<ArrayList<String>> errorValues = new ArrayList<>();
+ for (Integer i = 0, imax = records.size(); i < imax; i++) {
+ SObject record = records.get(i);
+
+ List<String> values = new ArrayList<>();
+ if (resultWrapper.getIsSuccess(i)) {
+ values.add(resultWrapper.getId(i));
+ }
+ for (Column col : pageReader.getSchema().getColumns()) {
+ Object obj = record.getSObjectField(col.getName());
+ if (obj != null) {
+ if (obj instanceof Calendar) {
+ DateTime dt = new DateTime((Calendar)obj);
+ values.add(dt.toString("yyyy-MM-dd'T'hh:mm:ssZZ"));
+ } else {
+ values.add(obj.toString());
+ }
+ } else {
+ values.add("");
+ }
+ }
+ if (!resultWrapper.getIsSuccess(i)) {
+ StringBuilder sb = new StringBuilder();
+ for (com.sforce.soap.partner.Error err : resultWrapper.getErrors(i)) {
+ if (sb.length() > 0) {
+ sb.append(";");
+ }
+ sb.append(err.getStatusCode())
+ .append(":")
+ .append(err.getMessage());
+ }
+
+ values.add(sb.toString());
+ }
+ if (resultWrapper.getIsSuccess(i)) {
+ successListWriter.write(values);
+ } else {
+ errorListWriter.write(values);
+ }
+ }
+ } finally {
+ if(successListWriter != null ) {
+ successListWriter.close();
+ }
+ if(errorListWriter != null ) {
+ errorListWriter.close();
+ }
+ }
+ }
+ }
+
+ public class ResultWrapper {
+ private final SaveResult[] saveResult;
+ private final UpsertResult[] upsertResult;
+ private final DeleteResult[] deleteResult;
+
+ public ResultWrapper(SaveResult[] saveResults,
+ UpsertResult[] upsertResults, DeleteResult[] deleteResults) {
+ this.saveResult = saveResults;
+ this.upsertResult = upsertResults;
+ this.deleteResult = deleteResults;
+ }
+
+ public Boolean isSaveResult() {
+ return this.saveResult != null;
+ }
+
+ public Boolean isUpsertResult() {
+ return this.upsertResult != null;
+ }
+
+ public Boolean isDeleteResult() {
+ return this.deleteResult != null;
+ }
+
+ public String getId(Integer index) {
+ if (this.isSaveResult()) {
+ return this.saveResult[index].getId();
+ } else if (this.isUpsertResult()) {
+ return this.upsertResult[index].getId();
+ } else if (this.isDeleteResult()) {
+ return this.deleteResult[index].getId();
+ }
+ return null;
+ }
+
+ public Boolean getIsSuccess(Integer index) {
+ if (this.isSaveResult()) {
+ return this.saveResult[index].isSuccess();
+ } else if (this.isUpsertResult()) {
+ return this.upsertResult[index].isSuccess();
+ } else if (this.isDeleteResult()) {
+ return this.deleteResult[index].isSuccess();
+ }
+ return null;
+ }
+
+ public Boolean getIsCreated(Integer index) {
+ if (this.isUpsertResult()) {
+ return this.upsertResult[index].isCreated();
+ }
+ return null;
+ }
+
+ public com.sforce.soap.partner.Error[] getErrors(Integer index) {
+ if (this.isSaveResult()) {
+ return this.saveResult[index].getErrors();
+ } else if (this.isUpsertResult()) {
+ return this.upsertResult[index].getErrors();
+ } else if (this.isDeleteResult()) {
+ return this.deleteResult[index].getErrors();
+ }
+ return null;
+ }
+ }
+
+}