package org.embulk.input.marketo.delegate; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.embulk.EmbulkTestRuntime; import org.embulk.base.restclient.ServiceResponseMapper; import org.embulk.base.restclient.record.RecordImporter; import org.embulk.base.restclient.record.ValueLocator; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource; import org.embulk.input.marketo.rest.MarketoRestClient; import org.embulk.input.marketo.rest.RecordPagingIterable; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.IOException; import java.util.List; import static org.junit.Assert.assertArrayEquals; /** * Created by tai.khuu on 10/10/17. */ public class CampaignInputPluginTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Rule public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); private ConfigSource configSource; private CampaignInputPlugin campaignInputPlugin; private MarketoRestClient mockMarketoRestClient; @Before public void setUp() throws Exception { campaignInputPlugin = Mockito.spy(new CampaignInputPlugin()); ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); mockMarketoRestClient = Mockito.mock(MarketoRestClient.class); Mockito.doReturn(mockMarketoRestClient).when(campaignInputPlugin).createMarketoRestClient(Mockito.any(CampaignInputPlugin.PluginTask.class)); } @Test public void testRun() throws IOException { RecordPagingIterable mockRecordPagingIterable = Mockito.mock(RecordPagingIterable.class); JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class); List objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/campaign_response_full.json"), javaType); Mockito.when(mockRecordPagingIterable.iterator()).thenReturn(objectNodeList.iterator()); Mockito.when(mockMarketoRestClient.getCampaign()).thenReturn(mockRecordPagingIterable); CampaignInputPlugin.PluginTask task = configSource.loadConfig(CampaignInputPlugin.PluginTask.class); ServiceResponseMapper mapper = campaignInputPlugin.buildServiceResponseMapper(task); RecordImporter recordImporter = mapper.createRecordImporter(); PageBuilder mockPageBuilder = Mockito.mock(PageBuilder.class); campaignInputPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder); Mockito.verify(mockMarketoRestClient, Mockito.times(1)).getCampaign(); Schema embulkSchema = mapper.getEmbulkSchema(); ArgumentCaptor longArgumentCaptor = ArgumentCaptor.forClass(Long.class); Mockito.verify(mockPageBuilder, Mockito.times(10)).setLong(Mockito.eq(embulkSchema.lookupColumn("id")), longArgumentCaptor.capture()); List allValues = longArgumentCaptor.getAllValues(); assertArrayEquals(new Long[]{1003L, 1004L, 1005L, 1006L, 1007L, 1008L, 1029L, 1048L, 1051L, 1065L}, allValues.toArray()); } }