package org.embulk.input.marketo.delegate; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Iterators; import org.embulk.base.restclient.jackson.JacksonServiceRecord; import org.embulk.base.restclient.jackson.JacksonServiceValue; import org.embulk.base.restclient.record.RecordImporter; import org.embulk.base.restclient.record.ServiceRecord; import org.embulk.base.restclient.record.ValueLocator; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigInject; import org.embulk.config.TaskReport; import org.embulk.input.marketo.CsvTokenizer; import org.embulk.input.marketo.MarketoService; import org.embulk.input.marketo.MarketoServiceImpl; import org.embulk.input.marketo.MarketoUtils; import org.embulk.input.marketo.rest.MarketoRestClient; import org.embulk.spi.BufferAllocator; import org.embulk.spi.Column; import org.embulk.spi.ColumnVisitor; import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; import org.embulk.spi.json.JsonParser; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampParser; import org.embulk.spi.util.InputStreamFileInput; import org.embulk.spi.util.LineDecoder; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; import org.msgpack.value.Value; import java.io.InputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; /** * Created by tai.khuu on 9/18/17. */ public abstract class MarketoBaseBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate { private static final String LATEST_FETCH_TIME = "latest_fetch_time"; private static final String LATEST_UID_LIST = "latest_uids"; private static final DateTimeFormatter ISO_DATETIME_FORMAT = ISODateTimeFormat.dateTimeParser(); private static final String FROM_DATE = "from_date"; private static final int MARKETO_MAX_RANGE_EXTRACT = 30; private static final String IMPORTED = "imported"; public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask { @Config("from_date") Date getFromDate(); @Config("fetch_days") @ConfigDefault("1") Integer getFetchDays(); @Config("latest_fetch_time") @ConfigDefault("null") Optional getLatestFetchTime(); @ConfigInject BufferAllocator getBufferAllocator(); @Config("polling_interval_second") @ConfigDefault("60") Integer getPollingIntervalSecond(); @Config("bulk_job_timeout_second") @ConfigDefault("3600") Integer getBulkJobTimeoutSecond(); @Config("incremental") @ConfigDefault("true") Boolean getIncremental(); @Config("latest_uids") @ConfigDefault("[]") Set getPreviousUids(); @Config("to_date") @ConfigDefault("null") Optional getToDate(); void setToDate(Optional toDate); @Config("incremental_column") @ConfigDefault("\"createdAt\"") Optional getIncrementalColumn(); void setIncrementalColumn(Optional incrementalColumn); @Config("uid_column") @ConfigDefault("null") Optional getUidColumn(); void setUidColumn(Optional uidColumn); } @Override public void validateInputTask(T task) { super.validateInputTask(task); if (task.getFromDate() == null) { throw new ConfigException("From date is required for Bulk Extract"); } if (task.getFromDate().getTime() >= task.getJobStartTime().getMillis()) { throw new ConfigException("From date can't not be in future"); } if (task.getIncremental() && task.getIncrementalColumn().isPresent() && task.getIncrementalColumn().get().equals("updatedAt")) { throw new ConfigException("Column 'updatedAt' cannot be incremental imported"); } //Calculate to date DateTime toDate = getToDate(task); task.setToDate(Optional.of(toDate.toDate())); } public DateTime getToDate(T task) { Date fromDate = task.getFromDate(); DateTime dateTime = new DateTime(fromDate); DateTime toDate = dateTime.plusDays(task.getFetchDays()); if (toDate.isAfter(task.getJobStartTime())) { //Lock down to date toDate = task.getJobStartTime(); } return toDate; } @Override public ConfigDiff buildConfigDiff(T task, Schema schema, int taskCount, List taskReports) { ConfigDiff configDiff = super.buildConfigDiff(task, schema, taskCount, taskReports); Long currentLatestFetchTime = 0L; Set latestUIds = new HashSet(); String incrementalColumn = task.getIncrementalColumn().orNull(); int imported = 0; if (incrementalColumn != null && task.getIncremental()) { DateFormat df = new SimpleDateFormat(MarketoUtils.MARKETO_DATE_SIMPLE_DATE_FORMAT); for (TaskReport taskReport : taskReports) { Long latestFetchTime = taskReport.get(Long.class, LATEST_FETCH_TIME); if (latestFetchTime == null) { continue; } if (currentLatestFetchTime < latestFetchTime) { currentLatestFetchTime = latestFetchTime; latestUIds = taskReport.get(Set.class, LATEST_UID_LIST); } else if (currentLatestFetchTime.equals(latestFetchTime)) { latestUIds.addAll(taskReport.get(Set.class, LATEST_UID_LIST)); } if (taskReport.has(IMPORTED)) { imported = imported + taskReport.get(Integer.class, IMPORTED); } } // in case of we didn't import anything but search range is entirely in the past. Then we should move the the range anyway. if (imported == 0) { Date toDate = task.getToDate().orNull(); configDiff.set(FROM_DATE, df.format(toDate)); } else { // Otherwise it's should start from the currentLastFetchTime plus 1 second. configDiff.set(FROM_DATE, df.format(new DateTime(currentLatestFetchTime).plusSeconds(1).toDate())); } configDiff.set(LATEST_FETCH_TIME, currentLatestFetchTime); configDiff.set(LATEST_UID_LIST, latestUIds); } return configDiff; } @Override public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, int taskIndex, PageBuilder pageBuilder) { TaskReport taskReport = Exec.newTaskReport(); String incrementalColumn = task.getIncrementalColumn().orNull(); String uidColumn = task.getUidColumn().orNull(); if (Exec.isPreview()) { return importMockPreviewData(pageBuilder); } else { try (LineDecoderIterator decoderIterator = getLineDecoderIterator(task)) { Iterator> csvRecords = Iterators.concat(Iterators.transform(decoderIterator, new Function>>() { @Override public Iterator> apply(LineDecoder input) { return new CsvRecordIterator(input, task); } })); Long latestFetchTime = task.getLatestFetchTime().or(0L); long currentTimestamp = latestFetchTime; Set latestUids = task.getPreviousUids(); //Keep the preview code here when we can enable real preview if (Exec.isPreview()) { csvRecords = Iterators.limit(csvRecords, PREVIEW_RECORD_LIMIT); } int imported = 0; while (csvRecords.hasNext()) { Map csvRecord = csvRecords.next(); if (task.getIncremental()) { String incrementalTimeStamp = csvRecord.get(incrementalColumn); long timestamp = ISO_DATETIME_FORMAT.parseDateTime(incrementalTimeStamp).getMillis(); //Ignore records that have timestamp smaller or equal with latestFetchTime if (latestFetchTime >= timestamp) { continue; } if (!csvRecord.containsKey(incrementalColumn)) { throw new DataException("Extracted record doesn't have incremental column " + incrementalColumn); } if (uidColumn != null) { String uid = csvRecord.get(uidColumn); if (latestUids.contains(uid)) { //Duplicate value continue; } } if (currentTimestamp < timestamp) { currentTimestamp = timestamp; //switch timestamp latestUids.clear(); } else if (currentTimestamp == timestamp) { //timestamp is equal if (uidColumn != null) { String uid = csvRecord.get(uidColumn); latestUids.add(uid); } } } ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); imported = imported + 1; } taskReport.set(LATEST_FETCH_TIME, currentTimestamp); taskReport.set(LATEST_UID_LIST, latestUids); taskReport.set(IMPORTED, imported); return taskReport; } } } /** * This method should be removed when we allow skip preview phase * @param pageBuilder * @return TaskReport */ private TaskReport importMockPreviewData(final PageBuilder pageBuilder) { final JsonParser jsonParser = new JsonParser(); Schema schema = pageBuilder.getSchema(); for (int i = 1; i <= PREVIEW_RECORD_LIMIT; i++) { final int rowNum = i; schema.visitColumns(new ColumnVisitor() { @Override public void booleanColumn(Column column) { pageBuilder.setBoolean(column, false); } @Override public void longColumn(Column column) { pageBuilder.setLong(column, 12345L); } @Override public void doubleColumn(Column column) { pageBuilder.setDouble(column, 12345.123); } @Override public void stringColumn(Column column) { pageBuilder.setString(column, column.getName() + "_" + rowNum); } @Override public void timestampColumn(Column column) { pageBuilder.setTimestamp(column, Timestamp.ofEpochMilli(System.currentTimeMillis())); } @Override public void jsonColumn(Column column) { pageBuilder.setJson(column, jsonParser.parse("{\"mockKey\":\"mockValue\"}")); } }); pageBuilder.addRecord(); } return Exec.newTaskReport(); } private LineDecoderIterator getLineDecoderIterator(T task) { List dateRanges = MarketoUtils.sliceRange(new DateTime(task.getFromDate()), new DateTime(task.getToDate().orNull()), MARKETO_MAX_RANGE_EXTRACT); final Iterator iterator = dateRanges.iterator(); return new LineDecoderIterator(iterator, task); } @Override protected final Iterator getServiceRecords(MarketoService marketoService, T task) { throw new UnsupportedOperationException(); } protected abstract InputStream getExtractedStream(MarketoService service, T task, DateTime fromDate, DateTime toDate); private static class AllStringJacksonServiceRecord extends JacksonServiceRecord { public AllStringJacksonServiceRecord(ObjectNode record) { super(record); } @Override public JacksonServiceValue getValue(ValueLocator locator) { // We know that this thing only contain text. JacksonServiceValue value = super.getValue(locator); return new StringConverterJacksonServiceRecord(value.stringValue()); } } private static class StringConverterJacksonServiceRecord extends JacksonServiceValue { private String textValue; public StringConverterJacksonServiceRecord(String textValue) { super(null); this.textValue = textValue; } @Override public boolean isNull() { return textValue == null || textValue.equals("null"); } @Override public boolean booleanValue() { return Boolean.parseBoolean(textValue); } @Override public double doubleValue() { return Double.parseDouble(textValue); } @Override public Value jsonValue(JsonParser jsonParser) { return jsonParser.parse(textValue); } @Override public long longValue() { return Long.parseLong(textValue); } @Override public String stringValue() { return textValue; } @Override public Timestamp timestampValue(TimestampParser timestampParser) { return timestampParser.parse(textValue); } } private final class LineDecoderIterator implements Iterator, AutoCloseable { private LineDecoder currentLineDecoder; private Iterator dateRangeIterator; private MarketoService marketoService; private MarketoRestClient marketoRestClient; private T task; public LineDecoderIterator(Iterator dateRangeIterator, T task) { marketoRestClient = createMarketoRestClient(task); marketoService = new MarketoServiceImpl(marketoRestClient); this.dateRangeIterator = dateRangeIterator; this.task = task; } @Override public void close() { if (currentLineDecoder != null) { currentLineDecoder.close(); } if (marketoRestClient != null) { marketoRestClient.close(); } } @Override public boolean hasNext() { return dateRangeIterator.hasNext(); } @Override public LineDecoder next() { if (hasNext()) { MarketoUtils.DateRange next = dateRangeIterator.next(); InputStream extractedStream = getExtractedStream(marketoService, task, next.fromDate, next.toDate); currentLineDecoder = new LineDecoder(new InputStreamFileInput(task.getBufferAllocator(), extractedStream), task); return currentLineDecoder; } throw new NoSuchElementException(); } @Override public void remove() { throw new UnsupportedOperationException("Removed are not supported"); } } private class CsvRecordIterator implements Iterator> { private CsvTokenizer tokenizer; private List headers; private Map currentCsvRecord; public CsvRecordIterator(LineDecoder lineDecoder, T task) { tokenizer = new CsvTokenizer(lineDecoder, task); if (!tokenizer.nextFile()) { throw new DataException("Can't read extract input stream"); } headers = new ArrayList<>(); tokenizer.nextRecord(); while (tokenizer.hasNextColumn()) { headers.add(tokenizer.nextColumn()); } } @Override public boolean hasNext() { if (currentCsvRecord == null) { currentCsvRecord = getNextCSVRecord(); } return currentCsvRecord != null; } @Override public Map next() { try { if (hasNext()) { return currentCsvRecord; } } finally { currentCsvRecord = null; } throw new NoSuchElementException(); } @Override public void remove() { throw new UnsupportedOperationException(); } private Map getNextCSVRecord() { if (!tokenizer.nextRecord()) { return null; } Map kvMap = new HashMap<>(); try { int i = 0; while (tokenizer.hasNextColumn()) { kvMap.put(headers.get(i), tokenizer.nextColumnOrNull()); i++; } } catch (CsvTokenizer.InvalidValueException ex) { throw new DataException("Encounter exception when parse csv file. Please check to see if you are using the correct" + "quote or escape character.", ex); } return kvMap; } } }