src/main/java/org/embulk/output/SalesforceOutputPlugin.java in embulk-output-salesforce-0.1.1 vs src/main/java/org/embulk/output/SalesforceOutputPlugin.java in embulk-output-salesforce-0.1.2

- old
+ new

@@ -1,29 +1,36 @@ package org.embulk.output; import com.google.common.base.Optional; import com.sforce.soap.partner.Connector; import com.sforce.soap.partner.DeleteResult; +import com.sforce.soap.partner.DescribeSObjectResult; +import com.sforce.soap.partner.Field; +import com.sforce.soap.partner.FieldType; import com.sforce.soap.partner.GetUserInfoResult; import com.sforce.soap.partner.PartnerConnection; import com.sforce.soap.partner.SaveResult; import com.sforce.soap.partner.UpsertResult; +import com.sforce.soap.partner.fault.ApiFault; import com.sforce.soap.partner.sobject.SObject; import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.embulk.config.CommitReport; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; @@ -43,11 +50,12 @@ public class SalesforceOutputPlugin implements OutputPlugin { protected static Logger logger; private static PartnerConnection client = null; - + private static Map<String, String> externalIdToObjectNameMap = null; + public interface PluginTask extends Task { @Config("username") public String getUsername(); @@ -110,10 +118,17 @@ connectorConfig.setAuthEndpoint(loginEndpoint + "/services/Soap/u/" +task.getVersion().get() + "/"); client = Connector.newConnection(connectorConfig); GetUserInfoResult userInfo = client.getUserInfo(); logger.info("login successful with {}", userInfo.getUserName()); + externalIdToObjectNameMap = new HashMap<>(); + DescribeSObjectResult describeResult = client.describeSObject(task.getSObject()); + for (Field field : describeResult.getFields()) { + if (field.getType() == FieldType.reference) { + externalIdToObjectNameMap.put(field.getRelationshipName(), field.getReferenceTo()[0]); + } + } } } catch(ConnectionException ex) { logger.error("Login error. Please check your credentials."); throw new RuntimeException(ex); } @@ -191,11 +206,11 @@ record.setType(this.sobject); pageReader.getSchema().visitColumns(new ColumnVisitor() { @Override public void doubleColumn(Column column) { - record.addField(column.getName(), pageReader.getDouble(column)); + columnWithReferenceCheck(column.getName(), pageReader.getDouble(column)); } @Override public void timestampColumn(Column column) { DateTime dt = new DateTime(pageReader.getTimestamp(column).getEpochSecond()*1000); Calendar cal = Calendar.getInstance(); @@ -205,21 +220,41 @@ dt.getHourOfDay(), dt.getMinuteOfHour(), dt.getSecondOfMinute()); record.addField(column.getName(), cal); } @Override public void stringColumn(Column column) { - record.addField(column.getName(), pageReader.getString(column)); + columnWithReferenceCheck(column.getName(), pageReader.getString(column)); } @Override public void longColumn(Column column) { - record.addField(column.getName(), pageReader.getLong(column)); + columnWithReferenceCheck(column.getName(), pageReader.getLong(column)); } @Override public void booleanColumn(Column column) { record.addField(column.getName(), pageReader.getBoolean(column)); } - + + private void columnWithReferenceCheck(String name, Object value) { + if (name.indexOf('.') > 0) { + String[] tokens = name.split("\\."); + String referencesFieldName = tokens[0]; + String externalIdFieldName = tokens[1]; + + SObject sObjRef = new SObject(); + String refFieldApiName = referencesFieldName.replaceAll("__R", "__r"); + if (externalIdToObjectNameMap.containsKey(refFieldApiName)) { + sObjRef.setType(externalIdToObjectNameMap.get(refFieldApiName)); + } else { + throw new ConfigException("Invalid Relationship Name '" + refFieldApiName + "'"); + } + sObjRef.addField(externalIdFieldName, value); + record.addField(referencesFieldName, sObjRef); + } else { + record.addField(name, value); + } + } + }); this.records.add(record); if (this.records.size() >= this.batchSize) { this.action(this.records); @@ -229,9 +264,13 @@ if (!this.records.isEmpty()) { this.action(this.records); logger.info("Number of processed records: {}", this.numOfSuccess + this.numOfError); } + } catch (ConfigException ex) { + logger.error("Configuration Error: {}", ex.getMessage()); + } catch (ApiFault ex) { + logger.error("API Error: {}", ex.getExceptionMessage()); } catch (ConnectionException ex) { logger.error("Connection Error: {}", ex.getMessage()); } catch (IOException ex) { throw new RuntimeException(ex); }