package org.embulk.input.marketo.rest; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import org.apache.commons.lang3.StringUtils; import org.eclipse.jetty.client.util.FormContentProvider; import org.eclipse.jetty.util.Fields; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigException; import org.embulk.config.Task; import org.embulk.input.marketo.MarketoUtils; import org.embulk.input.marketo.model.BulkExtractRangeHeader; import org.embulk.input.marketo.model.MarketoBulkExtractRequest; import org.embulk.input.marketo.model.MarketoError; import org.embulk.input.marketo.model.MarketoField; import org.embulk.input.marketo.model.MarketoResponse; import org.embulk.input.marketo.model.filter.DateRangeFilter; import org.embulk.input.marketo.model.filter.MarketoFilter; import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.type.Type; import org.embulk.spi.type.Types; import org.embulk.util.retryhelper.jetty92.DefaultJetty92ClientCreator; import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper; import org.slf4j.Logger; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by tai.khuu on 8/22/17. */ public class MarketoRestClient extends MarketoBaseRestClient { private static final String BATCH_SIZE = "batchSize"; private static final String NEXT_PAGE_TOKEN = "nextPageToken"; private static final String OFFSET = "offset"; private static final String MAX_RETURN = "maxReturn"; private static final String MAX_BATCH_SIZE = "300"; private static final String DEFAULT_MAX_RETURN = "200"; private static final String RANGE_HEADER = "Range"; private static final String FILTER_TYPE = "filterType"; private static final String FILTER_VALUES = "filterValues"; private static final String FIELDS = "fields"; private static final int MAX_REQUEST_SIZE = 300; private static final int CONNECT_TIMEOUT_IN_MILLIS = 30000; private static final int IDLE_TIMEOUT_IN_MILLIS = 60000; private String endPoint; private Integer batchSize; private Integer maxReturn; private static final Logger LOGGER = Exec.getLogger(MarketoRestClient.class.getCanonicalName()); private static final Map TYPE_MAPPING = new ImmutableMap.Builder() .put("datetime", Types.TIMESTAMP) .put("email", Types.STRING) .put("float", Types.DOUBLE) .put("integer", Types.LONG) .put("formula", Types.STRING) .put("percent", Types.LONG) .put("url", Types.STRING) .put("phone", Types.STRING) .put("textarea", Types.STRING) .put("text", Types.STRING) .put("string", Types.STRING) .put("score", Types.LONG) .put("boolean", Types.BOOLEAN) .put("currency", Types.DOUBLE) .put("date", Types.TIMESTAMP) .put("reference", Types.STRING) .build(); public interface PluginTask extends Task { @Config("account_id") String getAccountId(); @Config("client_secret") String getClientSecret(); @Config("client_id") String getClientId(); @Config("marketo_limit_interval_milis") @ConfigDefault("20000") Integer getMarketoLimitIntervalMilis(); @Config("batch_size") @ConfigDefault("300") Integer getBatchSize(); void setBatchSize(Integer batchSize); @Config("max_return") @ConfigDefault("200") Integer getMaxReturn(); void setMaxReturn(Integer maxReturn); @Config("read_timeout_millis") @ConfigDefault("60000") Long getReadTimeoutMillis(); @Config("maximum_retries") @ConfigDefault("7") Integer getMaximumRetries(); @Config("initial_retry_interval_milis") @ConfigDefault("20000") Integer getInitialRetryIntervalMilis(); @Config("maximum_retries_interval_milis") @ConfigDefault("120000") Integer getMaximumRetriesIntervalMilis(); } public MarketoRestClient(PluginTask task) { this(MarketoUtils.getEndPoint(task.getAccountId()), MarketoUtils.getIdentityEndPoint(task.getAccountId()), task.getClientId(), task.getClientSecret(), task.getBatchSize(), task.getMaxReturn(), task.getReadTimeoutMillis(), task.getMarketoLimitIntervalMilis(), new Jetty92RetryHelper(task.getMaximumRetries(), task.getInitialRetryIntervalMilis(), task.getMaximumRetriesIntervalMilis(), new DefaultJetty92ClientCreator(CONNECT_TIMEOUT_IN_MILLIS, IDLE_TIMEOUT_IN_MILLIS))); } public MarketoRestClient(String endPoint, String identityEndPoint, String clientId, String clientSecret, Integer batchSize, Integer maxReturn, long readTimeoutMilis, int marketoLimitIntervalMilis, Jetty92RetryHelper retryHelper) { super(identityEndPoint, clientId, clientSecret, marketoLimitIntervalMilis, readTimeoutMilis, retryHelper); this.endPoint = endPoint; this.batchSize = batchSize; this.maxReturn = maxReturn; } public List describeLead() { MarketoResponse jsonResponse = doGet(endPoint + MarketoRESTEndpoint.DESCRIBE_LEAD.getEndpoint(), null, null, new MarketoResponseJetty92EntityReader(this.readTimeoutMillis)); List marketoFields = new ArrayList<>(); List fields = jsonResponse.getResult(); for (int i = 0; i < fields.size(); i++) { ObjectNode field = fields.get(i); String dataType = field.get("dataType").asText(); if (field.has("rest")) { ObjectNode restField = (ObjectNode) field.get("rest"); String name = restField.get("name").asText(); marketoFields.add(new MarketoField(name, dataType)); } } return marketoFields; } private Type getType(String dataType) { return TYPE_MAPPING.containsKey(dataType.toLowerCase()) ? TYPE_MAPPING.get(dataType.toLowerCase()) : Types.STRING; } public String createLeadBulkExtract(Date startTime, Date endTime, List extractFields, String fitlerField) { MarketoBulkExtractRequest marketoBulkExtractRequest = getMarketoBulkExtractRequest(startTime, endTime, extractFields, fitlerField); return sendCreateBulkExtractRequest(marketoBulkExtractRequest, MarketoRESTEndpoint.CREATE_LEAD_EXTRACT); } private MarketoBulkExtractRequest getMarketoBulkExtractRequest(Date startTime, Date endTime, List extractFields, String rangeFilterName) { SimpleDateFormat timeFormat = new SimpleDateFormat(MarketoUtils.MARKETO_DATE_SIMPLE_DATE_FORMAT); MarketoBulkExtractRequest marketoBulkExtractRequest = new MarketoBulkExtractRequest(); if (extractFields != null) { marketoBulkExtractRequest.setFields(extractFields); } marketoBulkExtractRequest.setFormat("CSV"); Map filterMap = new HashMap<>(); DateRangeFilter dateRangeFilter = new DateRangeFilter(); dateRangeFilter.setStartAt(timeFormat.format(startTime)); dateRangeFilter.setEndAt(timeFormat.format(endTime)); filterMap.put(rangeFilterName, dateRangeFilter); marketoBulkExtractRequest.setFilter(filterMap); return marketoBulkExtractRequest; } public String createActivityExtract(Date startTime, Date endTime) { MarketoBulkExtractRequest marketoBulkExtractRequest = getMarketoBulkExtractRequest(startTime, endTime, null, "createdAt"); return sendCreateBulkExtractRequest(marketoBulkExtractRequest, MarketoRESTEndpoint.CREATE_ACTIVITY_EXTRACT); } public String sendCreateBulkExtractRequest(MarketoBulkExtractRequest request, MarketoRESTEndpoint endpoint) { MarketoResponse marketoResponse = null; try { LOGGER.info("Send bulk extract request [{}]", request); marketoResponse = doPost(endPoint + endpoint.getEndpoint(), null, null, OBJECT_MAPPER.writeValueAsString(request), new MarketoResponseJetty92EntityReader(readTimeoutMillis)); } catch (JsonProcessingException e) { LOGGER.error("Encounter exception when deserialize bulk extract request", e); throw new DataException("Can't create bulk extract"); } if (!marketoResponse.isSuccess()) { MarketoError marketoError = marketoResponse.getErrors().get(0); throw new DataException(marketoError.getCode() + ": " + marketoError.getMessage()); } ObjectNode objectNode = marketoResponse.getResult().get(0); return objectNode.get("exportId").asText(); } public void startLeadBulkExtract(String exportId) { startBulkExtract(MarketoRESTEndpoint.START_LEAD_EXPORT_JOB, exportId); } public void startActitvityBulkExtract(String exportId) { startBulkExtract(MarketoRESTEndpoint.START_ACTIVITY_EXPORT_JOB, exportId); } private void startBulkExtract(MarketoRESTEndpoint marketoRESTEndpoint, String exportId) { MarketoResponse marketoResponse = doPost(endPoint + marketoRESTEndpoint.getEndpoint( new ImmutableMap.Builder().put("export_id", exportId).build()), null, null, null, new MarketoResponseJetty92EntityReader(readTimeoutMillis)); if (!marketoResponse.isSuccess()) { MarketoError error = marketoResponse.getErrors().get(0); throw new DataException(String.format("Can't start job for export Job id : %s, error code: %s, error message: %s", exportId, error.getCode(), error.getMessage())); } } /** * Wait for lead bulk extract job * Will block and wait until job status switch to complete * If job run logger than bulk job timeout then will stop and throw exception * If job status is failed or cancel will also throw exception * * @param exportId * @throws InterruptedException */ public void waitLeadExportJobComplete(String exportId, int pollingInterval, int waitTimeout) throws InterruptedException { waitExportJobComplete(MarketoRESTEndpoint.GET_LEAD_EXPORT_STATUS, exportId, pollingInterval, waitTimeout); } /** * Wait for activites bulk extract job * Will block and wait until job status switch to complete * If job run logger than bulk job timeout then will stop and throw exception * If job status is failed or cancel will also throw exception * * @param exportId * @throws InterruptedException */ public void waitActitvityExportJobComplete(String exportId, int pollingInterval, int waitTimeout) throws InterruptedException { waitExportJobComplete(MarketoRESTEndpoint.GET_ACTIVITY_EXPORT_STATUS, exportId, pollingInterval, waitTimeout); } private void waitExportJobComplete(MarketoRESTEndpoint marketoRESTEndpoint, String exportId, int pollingInterval, int waitTimeout) throws InterruptedException { long waitTime = 0; long waitTimeoutMs = waitTimeout * 1000; long now = System.currentTimeMillis(); while (true) { MarketoResponse marketoResponse = doGet(this.endPoint + marketoRESTEndpoint.getEndpoint( new ImmutableMap.Builder().put("export_id", exportId).build()), null, null, new MarketoResponseJetty92EntityReader(readTimeoutMillis)); if (marketoResponse.isSuccess()) { ObjectNode objectNode = marketoResponse.getResult().get(0); String status = objectNode.get("status").asText(); if (status == null) { throw new DataException("Can't get bulk extract status export job id: " + exportId); } LOGGER.info("Jobs [{}] status is [{}]", exportId, status); switch (status) { case "Completed": LOGGER.info("Total wait time ms is [{}]", waitTime); LOGGER.info("File size is [{}] bytes", objectNode.get("fileSize")); return; case "Failed": throw new DataException("Bulk extract job failed exportId: " + exportId + " errorMessage: " + objectNode.get("errorMsg").asText()); case "Cancel": throw new DataException("Bulk extract job canceled, exportId: " + exportId); } } Thread.sleep(pollingInterval * 1000); waitTime = System.currentTimeMillis() - now; if (waitTime >= waitTimeoutMs) { throw new DataException("Job timeout exception, exportJob: " + exportId + ", run longer than " + waitTimeout + " seconds"); } } } public InputStream getLeadBulkExtractResult(String exportId, BulkExtractRangeHeader bulkExtractRangeHeader) { return getBulkExtractResult(MarketoRESTEndpoint.GET_LEAD_EXPORT_RESULT, exportId, bulkExtractRangeHeader); } public InputStream getActivitiesBulkExtractResult(String exportId, BulkExtractRangeHeader bulkExtractRangeHeader) { return getBulkExtractResult(MarketoRESTEndpoint.GET_ACTIVITY_EXPORT_RESULT, exportId, bulkExtractRangeHeader); } private InputStream getBulkExtractResult(MarketoRESTEndpoint endpoint, String exportId, BulkExtractRangeHeader bulkExtractRangeHeader) { LOGGER.info("Download bulk export job [{}]", exportId); Map headers = new HashMap<>(); if (bulkExtractRangeHeader != null) { headers.put(RANGE_HEADER, bulkExtractRangeHeader.toRangeHeaderValue()); LOGGER.info("Range header value [{}]", bulkExtractRangeHeader.toRangeHeaderValue()); } return doGet(this.endPoint + endpoint.getEndpoint(new ImmutableMap.Builder().put("export_id", exportId).build()), headers, null, new MarketoInputStreamResponseEntityReader(readTimeoutMillis)); } public RecordPagingIterable getLists() { return getRecordWithTokenPagination(endPoint + MarketoRESTEndpoint.GET_LISTS.getEndpoint(), new ImmutableListMultimap.Builder().put(BATCH_SIZE, MAX_BATCH_SIZE).build(), ObjectNode.class); } public RecordPagingIterable getPrograms() { return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_PROGRAMS.getEndpoint(), new ImmutableListMultimap.Builder().put(MAX_RETURN, DEFAULT_MAX_RETURN).build(), ObjectNode.class); } public RecordPagingIterable getLeadsByProgram(String programId, String fieldNames) { Multimap multimap = ArrayListMultimap.create(); multimap.put("fields", fieldNames); return getRecordWithTokenPagination(endPoint + MarketoRESTEndpoint.GET_LEADS_BY_PROGRAM.getEndpoint(new ImmutableMap.Builder().put("program_id", programId).build()), multimap, ObjectNode.class); } public RecordPagingIterable getLeadsByList(String listId, String fieldNames) { Multimap multimap = ArrayListMultimap.create(); multimap.put("fields", fieldNames); return getRecordWithTokenPagination(endPoint + MarketoRESTEndpoint.GET_LEADS_BY_LIST.getEndpoint(new ImmutableMap.Builder().put("list_id", listId).build()), multimap, ObjectNode.class); } public RecordPagingIterable getCampaign() { return getRecordWithTokenPagination(endPoint + MarketoRESTEndpoint.GET_CAMPAIGN.getEndpoint(), null, ObjectNode.class); } private RecordPagingIterable getRecordWithOffsetPagination(final String endPoint, final Multimap parameters, final Class recordClass) { return new RecordPagingIterable<>(new RecordPagingIterable.PagingFunction>() { @Override public RecordPagingIterable.OffsetPage getNextPage(RecordPagingIterable.OffsetPage currentPage) { return getOffsetPage(currentPage.getNextOffSet()); } @Override public RecordPagingIterable.OffsetPage getFirstPage() { return getOffsetPage(0); } private RecordPagingIterable.OffsetPage getOffsetPage(int offset) { ImmutableListMultimap.Builder params = new ImmutableListMultimap.Builder<>(); params.put(OFFSET, String.valueOf(offset)); params.put(MAX_RETURN, String.valueOf(maxReturn)); if (parameters != null) { params.putAll(parameters); } MarketoResponse marketoResponse = doGet(endPoint, null, params.build(), new MarketoResponseJetty92EntityReader<>(readTimeoutMillis, recordClass)); return new RecordPagingIterable.OffsetPage<>(marketoResponse.getResult(), offset + marketoResponse.getResult().size(), marketoResponse.getResult().size() == maxReturn); } }); } private RecordPagingIterable getRecordWithTokenPagination(final String endPoint, final Multimap parameters, final Class recordClass) { return new RecordPagingIterable<>(new RecordPagingIterable.PagingFunction>() { @Override public RecordPagingIterable.TokenPage getNextPage(RecordPagingIterable.TokenPage currentPage) { return getTokenPage(currentPage); } @Override public RecordPagingIterable.TokenPage getFirstPage() { return getTokenPage(null); } @SuppressWarnings("unchecked") private RecordPagingIterable.TokenPage getTokenPage(RecordPagingIterable.TokenPage page) { ImmutableListMultimap.Builder params = new ImmutableListMultimap.Builder<>(); params.put("_method", "GET"); Fields fields = new Fields(); if (page != null) { fields.add(NEXT_PAGE_TOKEN, page.getNextPageToken()); } fields.add(BATCH_SIZE, String.valueOf(batchSize)); if (parameters != null) { for (String key : parameters.keySet()) { //params that is passed in should overwrite default fields.remove(key); for (String value : parameters.get(key)) { fields.add(key, value); } } } //Let do GET Disguise in POST here to overcome Marketo URI Too long error FormContentProvider formContentProvider = new FormContentProvider(fields); MarketoResponse marketoResponse = doPost(endPoint, null, params.build(), new MarketoResponseJetty92EntityReader<>(readTimeoutMillis, recordClass), formContentProvider); return new RecordPagingIterable.TokenPage<>(marketoResponse.getResult(), marketoResponse.getNextPageToken(), marketoResponse.getNextPageToken() != null); } }); } public Iterable getProgramsByTag(String tagType, String tagValue) { Multimap multimap = ArrayListMultimap.create(); multimap.put("tagType", tagType); multimap.put("tagValue", tagValue); return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_PROGRAMS_BY_TAG.getEndpoint(), multimap, ObjectNode.class); } public Iterable getProgramsByDateRange(Date earliestUpdatedAt, Date latestUpdatedAt, String filterType, List filterValues) { SimpleDateFormat timeFormat = new SimpleDateFormat(MarketoUtils.MARKETO_DATE_SIMPLE_DATE_FORMAT); Multimap multimap = ArrayListMultimap.create(); multimap.put("earliestUpdatedAt", timeFormat.format(earliestUpdatedAt)); multimap.put("latestUpdatedAt", timeFormat.format(latestUpdatedAt)); // put filter params if exist. if (filterType != null) { multimap.put("filterType", filterType); multimap.put("filterValues", String.join(",", filterValues)); } return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_PROGRAMS.getEndpoint(), multimap, ObjectNode.class); } public List describeCustomObject(String apiName) { MarketoResponse jsonResponse = doGet(endPoint + MarketoRESTEndpoint.GET_CUSTOM_OBJECT_DESCRIBE.getEndpoint(new ImmutableMap.Builder().put("api_name", apiName).build()), null, null, new MarketoResponseJetty92EntityReader(this.readTimeoutMillis)); if (jsonResponse.getResult().size() == 0) { throw new ConfigException(String.format("Custom Object %s is not exits.", apiName)); } List marketoFields = new ArrayList<>(); JsonNode fieldNodes = jsonResponse.getResult().get(0).path("fields"); for (JsonNode node : fieldNodes) { String dataType = node.get("dataType").asText(); String name = node.get("name").asText(); marketoFields.add(new MarketoField(name, dataType)); } if (marketoFields.size() == 0) { throw new ConfigException(String.format("Custom Object %s don't have any field data.", apiName)); } return marketoFields; } private RecordPagingIterable getCustomObjectRecordWithPagination(final String endPoint, final String customObjectFilterType, final String customObjectFields, final Integer fromValue, final Integer toValue, final Class recordClass) { return new RecordPagingIterable<>(new RecordPagingIterable.PagingFunction>() { @Override public RecordPagingIterable.OffsetWithTokenPage getNextPage(RecordPagingIterable.OffsetWithTokenPage currentPage) { return getOffsetPage(currentPage.getNextOffSet(), currentPage.getNextPageToken()); } @Override public RecordPagingIterable.OffsetWithTokenPage getFirstPage() { return getOffsetPage(fromValue, ""); } private RecordPagingIterable.OffsetWithTokenPage getOffsetPage(int offset, String nextPageToken) { boolean isMoreResult = true; boolean isEndOffset = false; int nextOffset = offset + MAX_REQUEST_SIZE; if (toValue != null) { if (toValue <= nextOffset) { nextOffset = toValue + 1; isEndOffset = true; } } StringBuilder filterValues = new StringBuilder(); for (int i = offset; i < (nextOffset - 1); i++) { filterValues.append(String.valueOf(i)).append(","); } filterValues.append(String.valueOf(nextOffset - 1)); ImmutableListMultimap.Builder params = new ImmutableListMultimap.Builder<>(); params.put(FILTER_TYPE, customObjectFilterType); params.put(FILTER_VALUES, filterValues.toString()); if (StringUtils.isNotBlank(nextPageToken)) { params.put(NEXT_PAGE_TOKEN, nextPageToken); } if (customObjectFields != null) { params.put(FIELDS, customObjectFields); } MarketoResponse marketoResponse = doGet(endPoint, null, params.build(), new MarketoResponseJetty92EntityReader<>(readTimeoutMillis, recordClass)); String nextToken = ""; if (StringUtils.isNotBlank(marketoResponse.getNextPageToken())) { nextToken = marketoResponse.getNextPageToken(); //skip offset when nextPageToken is exits. nextOffset = offset; } if (toValue == null) { if (marketoResponse.getResult().isEmpty()) { isMoreResult = false; } } else { if (isEndOffset && StringUtils.isBlank(nextToken)) { isMoreResult = false; } } return new RecordPagingIterable.OffsetWithTokenPage<>(marketoResponse.getResult(), nextOffset, nextToken, isMoreResult); } }); } public Iterable getCustomObject(String customObjectAPIName, String customObjectFilterType, String customObjectFields, Integer fromValue, Integer toValue) { return getCustomObjectRecordWithPagination(endPoint + MarketoRESTEndpoint.GET_CUSTOM_OBJECT.getEndpoint(new ImmutableMap.Builder().put("api_name", customObjectAPIName).build()), customObjectFilterType, customObjectFields, fromValue, toValue, ObjectNode.class); } }