package org.embulk.input.marketo.delegate; import org.embulk.EmbulkTestRuntime; import org.embulk.base.restclient.ServiceResponseMapper; import org.embulk.base.restclient.record.ValueLocator; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.input.marketo.rest.MarketoRestClient; import org.embulk.spi.Column; import org.embulk.spi.PageBuilder; import org.joda.time.DateTime; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.io.IOException; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Set; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; /** * Created by khuutantaitai on 10/3/17. */ public class ActivityBulkExtractInputPluginTest { @Rule public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); private ActivityBulkExtractInputPlugin activityBulkExtractInputPlugin; private ConfigSource configSource; private MarketoRestClient mockMarketoRestclient; @Before public void prepare() throws IOException { activityBulkExtractInputPlugin = spy(new ActivityBulkExtractInputPlugin()); ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/activity_bulk_extract_config.yaml")); mockMarketoRestclient = mock(MarketoRestClient.class); doReturn(mockMarketoRestclient).when(activityBulkExtractInputPlugin).createMarketoRestClient(any(ActivityBulkExtractInputPlugin.PluginTask.class)); } @Test public void testRun() throws InterruptedException { ActivityBulkExtractInputPlugin.PluginTask task = configSource.loadConfig(ActivityBulkExtractInputPlugin.PluginTask.class); DateTime startDate = new DateTime(task.getFromDate()); PageBuilder pageBuilder = mock(PageBuilder.class); String exportId1 = "exportId1"; String exportId2 = "exportId2"; when(mockMarketoRestclient.createActivityExtract(any(Date.class), any(Date.class))).thenReturn(exportId1).thenReturn(exportId2).thenReturn(null); when(mockMarketoRestclient.getActivitiesBulkExtractResult(eq(exportId1))).thenReturn(this.getClass().getResourceAsStream("/fixtures/activity_extract1.csv")); when(mockMarketoRestclient.getActivitiesBulkExtractResult(eq(exportId2))).thenReturn(this.getClass().getResourceAsStream("/fixtures/activity_extract2.csv")); ServiceResponseMapper mapper = activityBulkExtractInputPlugin.buildServiceResponseMapper(task); activityBulkExtractInputPlugin.validateInputTask(task); TaskReport taskReport = activityBulkExtractInputPlugin.ingestServiceData(task, mapper.createRecordImporter(), 1, pageBuilder); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); Column marketoGUID = mapper.getEmbulkSchema().lookupColumn("marketoGUID"); verify(pageBuilder, times(55)).setString(eq(marketoGUID), argumentCaptor.capture()); verify(mockMarketoRestclient, times(1)).startActitvityBulkExtract(eq(exportId1)); verify(mockMarketoRestclient, times(1)).waitActitvityExportJobComplete(eq(exportId1), eq(task.getPollingIntervalSecond()), eq(task.getBulkJobTimeoutSecond())); verify(mockMarketoRestclient, times(1)).startActitvityBulkExtract(eq(exportId2)); verify(mockMarketoRestclient, times(1)).waitActitvityExportJobComplete(eq(exportId2), eq(task.getPollingIntervalSecond()), eq(task.getBulkJobTimeoutSecond())); verify(mockMarketoRestclient, times(1)).createActivityExtract(startDate.toDate(), startDate.plusDays(30).toDate()); DateTime startDate2 = startDate.plusDays(30).plusSeconds(1); verify(mockMarketoRestclient, times(1)).createActivityExtract(startDate2.toDate(), startDate.plusDays(task.getFetchDays()).toDate()); List marketoUids = argumentCaptor.getAllValues(); assertEquals(55, marketoUids.size()); long latestFetchTime = taskReport.get(Long.class, "latest_fetch_time"); Set latestUids = taskReport.get(Set.class, "latest_uids"); assertEquals(1504888754000L, latestFetchTime); assertEquals(Arrays.asList("558681", "558682", "558683", "558684", "558685", "558686", "558687", "558688", "558689", "558690", "558691", "558692", "558693", "558694", "558695", "558696", "558697", "558698", "558699", "558700", "558701", "558702", "558703", "558704", "558705", "558706", "558707", "558708", "558709", "558710", "558711", "558712", "558713", "558714", "558716", "558717", "558718", "558719", "558720", "558721", "558722", "558723", "558724", "558725", "558726", "558727", "558728", "558729", "558730", "558731", "558732", "558733", "558734", "558735", "558736"), marketoUids); assertEquals(36, latestUids.size()); } }