package org.embulk.input.marketo; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; import com.google.common.collect.Iterables; import org.apache.commons.lang3.StringUtils; import org.embulk.input.marketo.model.BulkExtractRangeHeader; import org.embulk.input.marketo.model.MarketoField; import org.embulk.input.marketo.rest.MarketoRestClient; import org.embulk.input.marketo.rest.RecordPagingIterable; import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.slf4j.Logger; import java.io.IOException; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.commons.io.FileUtils; import java.util.Date; import java.util.List; import java.time.format.DateTimeFormatter; import java.text.DateFormat; import java.text.SimpleDateFormat; //import com.amazonaws.services.s3.AmazonS3; /** * Created by tai.khuu on 9/6/17. */ public class MarketoServiceImpl implements MarketoService { private static final Logger LOGGER = Exec.getLogger(MarketoServiceImpl.class); private static final String DEFAULT_FILE_FORMAT = "csv"; private static final int BUF_SIZE = 0x1000; private static final int MAX_RESUME_TIME = 50; private MarketoRestClient marketoRestClient; private static Date startTime; private DateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy"); // private AmazonS3 s3; public MarketoServiceImpl(MarketoRestClient marketoRestClient) { this.marketoRestClient = marketoRestClient; } @Override public File extractLead(final Date startTime, Date endTime, List extractedFields, String filterField, int pollingTimeIntervalSecond, final int bulkJobTimeoutSecond) { final String exportID = marketoRestClient.createLeadBulkExtract(startTime, endTime, extractedFields, filterField); marketoRestClient.startLeadBulkExtract(exportID); try { marketoRestClient.waitLeadExportJobComplete(exportID, pollingTimeIntervalSecond, bulkJobTimeoutSecond); } catch (InterruptedException e) { LOGGER.error("Exception when waiting for export job id: {}", exportID, e); throw new DataException("Error when wait for bulk extract"); } return downloadBulkExtract(new Function() { @Override public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader) { return marketoRestClient.getLeadBulkExtractResult(exportID, bulkExtractRangeHeader); } }); } private long saveExtractedFile(InputStream extractResult, File tempFile) throws DownloadBulkExtractException { long total = 0; try (OutputStream fileOuputStream = new FileOutputStream(tempFile, true)) { byte[] buf = new byte[BUF_SIZE]; while (true) { int r = extractResult.read(buf); if (r == -1) { break; } fileOuputStream.write(buf, 0, r); total += r; } } catch (IOException e) { LOGGER.error("Encounter exception when download bulk extract file", e); throw new DownloadBulkExtractException("Encounter exception when download bulk extract file", e, total); } return total; } // public boolean exists(String bucketName, String key) { // try { // fullObject = s3Client.getObject(new GetObjectRequest(bucketName, key)); // System.out.println("s3 object Content-Type: " + fullObject.getObjectMetadata().getContentType()); // return fullObject; // } catch(AmazonServiceException e) { // return false; // } // return true; // } // public String getBucket(String s3_key){ // if(s3_key.lenght()) { // return s3_key.split('/')[0]; // } // } // // public String getKey(String s3_key){ // if(s3_key.lenght()) { // String [] key_parts = s3_key.split('/'); // String key = ""; // for (String k: key_parts){ // key = key + "/" + k ; // } // return s3_key.split('/')[0]; // } // } @Override public File extractAllActivity(List activityTypeIds, Date startTime, Date endTime, int pollingTimeIntervalSecond, int bulkJobTimeoutSecond, String landingZoneFile) { String tempFile = "/tmp/marketo_" + this.dateFormat.format(startTime) +".csv"; this.startTime = startTime; try{ if (landingZoneFile.length()>0){ LOGGER.info("Let's check for temp File = {}", tempFile); File f = new File(tempFile); if (f.exists() && f.isFile() && f.canRead()){ return f; } // // Lets read and return csv file from s3 // s3_object = get_s3_object(getBucket(landingZoneFile), getKey(landingZoneFile)); // if(s3_object){ // return s3_object.getObjectContent(); // } } } catch (Exception e){ System.out.println("Exception on retrieving from landing zone"); System.out.println("Let's continue normal execution."); } final String exportID = marketoRestClient.createActivityExtract(activityTypeIds, startTime, endTime); marketoRestClient.startActitvityBulkExtract(exportID); try { marketoRestClient.waitActitvityExportJobComplete(exportID, pollingTimeIntervalSecond, bulkJobTimeoutSecond); } catch (InterruptedException e) { LOGGER.error("Exception when waiting for export job id: {}", exportID, e); throw new DataException("Error when wait for bulk extract"); } return downloadBulkExtract(new Function() { @Override public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader) { return marketoRestClient.getActivitiesBulkExtractResult(exportID, bulkExtractRangeHeader); } }); } private File downloadBulkExtract(Function getBulkExtractfunction) { final File tempFile = Exec.getTempFileSpace().createTempFile(DEFAULT_FILE_FORMAT); long startByte = 0; int resumeTime = 0; while (resumeTime < MAX_RESUME_TIME) { BulkExtractRangeHeader bulkExtractRangeHeader = new BulkExtractRangeHeader(startByte); InputStream bulkExtractResult = getBulkExtractfunction.apply(bulkExtractRangeHeader); try { saveExtractedFile(bulkExtractResult, tempFile); if (this.startTime != null){ String destinationFileName = "/tmp/marketo_" + this.dateFormat.format(this.startTime) +".csv"; final File destination = new File(destinationFileName); try{ FileUtils.copyFile(tempFile, destination); LOGGER.info("Save activities in file: "+destinationFileName); } catch (IOException e){ LOGGER.error("Exception when copying to file destination {}\nException:{}",destination.getPath(),e ); } } return tempFile; } catch (DownloadBulkExtractException e) { startByte = startByte + e.getByteWritten(); LOGGER.warn("will resume activity bulk extract at byte [{}]", startByte); } resumeTime = resumeTime + 1; } //Too many resume we still can't get the file throw new DataException("Can't down load bulk extract"); } @Override public Iterable getAllListLead(List fieldNames) { RecordPagingIterable lists = marketoRestClient.getLists(); final String fieldNameString = StringUtils.join(fieldNames, ","); return MarketoUtils.flatMap(lists, new Function>() { @Override public Iterable apply(ObjectNode input) { final String id = input.get("id").asText(); return Iterables.transform(marketoRestClient.getLeadsByList(id, fieldNameString), new Function() { @Override public ObjectNode apply(ObjectNode input) { input.put(MarketoUtils.LIST_ID_COLUMN_NAME, id); return input; } }); } }); } @Override public Iterable getAllProgramLead(List fieldNames) { RecordPagingIterable lists = marketoRestClient.getPrograms(); final String fieldNameString = StringUtils.join(fieldNames, ","); return MarketoUtils.flatMap(lists, new Function>() { @Override public Iterable apply(ObjectNode input) { final String id = input.get("id").asText(); return Iterables.transform(marketoRestClient.getLeadsByProgram(id, fieldNameString), new Function() { @Override public ObjectNode apply(ObjectNode input) { input.put(MarketoUtils.PROGRAM_ID_COLUMN_NAME, id); return input; } }); } }); } @Override public RecordPagingIterable getCampaign() { return marketoRestClient.getCampaign(); } @Override public List describeLead() { return marketoRestClient.describeLead(); } private static class DownloadBulkExtractException extends Exception { private final long byteWritten; public DownloadBulkExtractException(String message, Throwable cause, long byteWritten) { super(message, cause); this.byteWritten = byteWritten; } public DownloadBulkExtractException(Throwable cause, long byteWritten) { super(cause); this.byteWritten = byteWritten; } public long getByteWritten() { return byteWritten; } } @Override public Iterable getPrograms() { return marketoRestClient.getPrograms(); } @Override public Iterable getProgramsByTag(String tagType, String tagValue) { return marketoRestClient.getProgramsByTag(tagType, tagValue); } @Override public Iterable getProgramsByDateRange(Date earliestUpdatedAt, Date latestUpdatedAt, String filterType, List filterValues) { return marketoRestClient.getProgramsByDateRange(earliestUpdatedAt, latestUpdatedAt, filterType, filterValues); } @Override public List describeCustomObject(String customObjectAPIName) { return marketoRestClient.describeCustomObject(customObjectAPIName); } @Override public Iterable getCustomObject(String customObjectAPIName, String customObjectFilterType, String customObjectFields, Integer fromValue, Integer toValue) { return marketoRestClient.getCustomObject(customObjectAPIName, customObjectFilterType, customObjectFields, fromValue, toValue); } @Override public Iterable getActivityTypes() { return marketoRestClient.getActivityTypes(); } }