src/main/java/org/embulk/output/SalesforceOutputPlugin.java in embulk-output-salesforce-0.1.1 vs src/main/java/org/embulk/output/SalesforceOutputPlugin.java in embulk-output-salesforce-0.1.2
- old
+ new
@@ -1,29 +1,36 @@
package org.embulk.output;
import com.google.common.base.Optional;
import com.sforce.soap.partner.Connector;
import com.sforce.soap.partner.DeleteResult;
+import com.sforce.soap.partner.DescribeSObjectResult;
+import com.sforce.soap.partner.Field;
+import com.sforce.soap.partner.FieldType;
import com.sforce.soap.partner.GetUserInfoResult;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.UpsertResult;
+import com.sforce.soap.partner.fault.ApiFault;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.embulk.config.CommitReport;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
+import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
@@ -43,11 +50,12 @@
public class SalesforceOutputPlugin
implements OutputPlugin
{
protected static Logger logger;
private static PartnerConnection client = null;
-
+ private static Map<String, String> externalIdToObjectNameMap = null;
+
public interface PluginTask
extends Task
{
@Config("username")
public String getUsername();
@@ -110,10 +118,17 @@
connectorConfig.setAuthEndpoint(loginEndpoint + "/services/Soap/u/" +task.getVersion().get() + "/");
client = Connector.newConnection(connectorConfig);
GetUserInfoResult userInfo = client.getUserInfo();
logger.info("login successful with {}", userInfo.getUserName());
+ externalIdToObjectNameMap = new HashMap<>();
+ DescribeSObjectResult describeResult = client.describeSObject(task.getSObject());
+ for (Field field : describeResult.getFields()) {
+ if (field.getType() == FieldType.reference) {
+ externalIdToObjectNameMap.put(field.getRelationshipName(), field.getReferenceTo()[0]);
+ }
+ }
}
} catch(ConnectionException ex) {
logger.error("Login error. Please check your credentials.");
throw new RuntimeException(ex);
}
@@ -191,11 +206,11 @@
record.setType(this.sobject);
pageReader.getSchema().visitColumns(new ColumnVisitor() {
@Override
public void doubleColumn(Column column) {
- record.addField(column.getName(), pageReader.getDouble(column));
+ columnWithReferenceCheck(column.getName(), pageReader.getDouble(column));
}
@Override
public void timestampColumn(Column column) {
DateTime dt = new DateTime(pageReader.getTimestamp(column).getEpochSecond()*1000);
Calendar cal = Calendar.getInstance();
@@ -205,21 +220,41 @@
dt.getHourOfDay(), dt.getMinuteOfHour(), dt.getSecondOfMinute());
record.addField(column.getName(), cal);
}
@Override
public void stringColumn(Column column) {
- record.addField(column.getName(), pageReader.getString(column));
+ columnWithReferenceCheck(column.getName(), pageReader.getString(column));
}
@Override
public void longColumn(Column column) {
- record.addField(column.getName(), pageReader.getLong(column));
+ columnWithReferenceCheck(column.getName(), pageReader.getLong(column));
}
@Override
public void booleanColumn(Column column) {
record.addField(column.getName(), pageReader.getBoolean(column));
}
-
+
+ private void columnWithReferenceCheck(String name, Object value) {
+ if (name.indexOf('.') > 0) {
+ String[] tokens = name.split("\\.");
+ String referencesFieldName = tokens[0];
+ String externalIdFieldName = tokens[1];
+
+ SObject sObjRef = new SObject();
+ String refFieldApiName = referencesFieldName.replaceAll("__R", "__r");
+ if (externalIdToObjectNameMap.containsKey(refFieldApiName)) {
+ sObjRef.setType(externalIdToObjectNameMap.get(refFieldApiName));
+ } else {
+ throw new ConfigException("Invalid Relationship Name '" + refFieldApiName + "'");
+ }
+ sObjRef.addField(externalIdFieldName, value);
+ record.addField(referencesFieldName, sObjRef);
+ } else {
+ record.addField(name, value);
+ }
+ }
+
});
this.records.add(record);
if (this.records.size() >= this.batchSize) {
this.action(this.records);
@@ -229,9 +264,13 @@
if (!this.records.isEmpty()) {
this.action(this.records);
logger.info("Number of processed records: {}", this.numOfSuccess + this.numOfError);
}
+ } catch (ConfigException ex) {
+ logger.error("Configuration Error: {}", ex.getMessage());
+ } catch (ApiFault ex) {
+ logger.error("API Error: {}", ex.getExceptionMessage());
} catch (ConnectionException ex) {
logger.error("Connection Error: {}", ex.getMessage());
} catch (IOException ex) {
throw new RuntimeException(ex);
}