package org.embulk.input.adebis.operation; import com.google.common.base.Optional; import jp.ne.ebis.extreme.ws.AdServiceStub; import org.apache.axis2.AxisFault; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.Task; import org.embulk.spi.Column; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; import org.embulk.spi.type.Types; import java.rmi.RemoteException; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; public class AdConversionAttribute extends AbstractOperation { public interface Condition extends Task { @Config("start_date") @ConfigDefault("null") public Optional getStartDate(); @Config("end_date") @ConfigDefault("null") public Optional getEndDate(); @Config("ad_group1_ids") @ConfigDefault("null") public Optional> getAdGroup1Ids(); @Config("ad_group2_ids") @ConfigDefault("null") public Optional> getAdGroup2Ids(); @Config("media_ids") @ConfigDefault("null") public Optional> getMediaIds(); @Config("ad_ids") @ConfigDefault("null") public Optional> getAdIds(); @Config("terminal_types") @ConfigDefault("null") public Optional> getTerminalTypes(); @Config("keyword") @ConfigDefault("null") public Optional getKeyword(); @Config("conversion_ids") @ConfigDefault("null") public Optional> getConversionIds(); } public void run(PageBuilder pageBuilder) { Schema schema = pageBuilder.getSchema(); AdServiceStub stub; try { stub = new AdServiceStub(); } catch (AxisFault axisFault) { axisFault.printStackTrace(); return; } LocalDate startDate = LocalDate.parse(condition.getStartDate().get(), adebisDateFormat); LocalDate endDate = LocalDate.parse(condition.getEndDate().get(), adebisDateFormat); LocalDateTime lastRecordTime, newestRecordTime; if(getLastRecordTime().isPresent()) lastRecordTime = LocalDateTime.parse(getLastRecordTime().get(), embulkDateTimeFormat); else lastRecordTime = startDate.atStartOfDay(); if(lastRecordTime.isAfter(startDate.atStartOfDay())){ startDate = lastRecordTime.toLocalDate(); } AdServiceStub.GetAdConversionAttributes params = new AdServiceStub.GetAdConversionAttributes(); AdServiceStub.AdConversionAttributeCondition cond = new AdServiceStub.AdConversionAttributeCondition(); // Build Header AdServiceStub.HoteiAuthHeader header = new AdServiceStub.HoteiAuthHeader(); AdServiceStub.HoteiAuthHeaderE headerE = new AdServiceStub.HoteiAuthHeaderE(); header.setLogId(getLogId()); header.setLogArgument(getLogArgument()); header.setApiKey(getApiKey()); headerE.setHoteiAuthHeader(header); // Build condition cond.setStartDate(startDate.format(adebisDateFormat)); cond.setEndDate(endDate.format(adebisDateFormat)); // Build parameter params.setAdConversionAttributeCondition(cond); int offset = 0, limit = FETCH_LIMIT_AT_ONCE; params.setLimit(limit); AdServiceStub.GetAdConversionAttributesResponse response; newestRecordTime = lastRecordTime; while(true) { params.setOffset(offset); try { response = stub.getAdConversionAttributes(params, headerE); } catch (RemoteException e) { e.printStackTrace(); return; } catch (NullPointerException e) { return; } AdServiceStub.AdConversionAttribute[] results = response.get_return(); if (results != null) offset += limit; else break; for (AdServiceStub.AdConversionAttribute record : results) { Boolean isExcluded = false; for (Column c : schema.getColumns()) { switch (c.getName()) { case "amount": if (record.isAmountSpecified()) pageBuilder.setLong(c, record.getAmount()); else pageBuilder.setNull(c); break; case "conversionDate": if (record.isConversionDateSpecified()) { String conversionDate = record.getConversionDate(); LocalDateTime recordTime = LocalDateTime.parse(conversionDate, adebisDateTimeFormat); if (recordTime.isAfter(lastRecordTime)) { pageBuilder.setTimestamp(c, toTimestamp(conversionDate)); if (recordTime.isAfter(newestRecordTime)) newestRecordTime = recordTime; } else { isExcluded = true; } } else pageBuilder.setNull(c); break; case "conversionId": if (record.isConversionIdSpecified()) pageBuilder.setLong(c, record.getConversionId()); else pageBuilder.setNull(c); break; case "directAd": if (record.isDirectAdSpecified()) pageBuilder.setString(c, record.getDirectAd()); else pageBuilder.setNull(c); break; case "firstAd": if (record.isFirstAdSpecified()) pageBuilder.setString(c, record.getFirstAd()); else pageBuilder.setNull(c); break; case "firstAdDate": if (record.isFirstAdDateSpecified()) { String fistAdDateSpecified = record.getFirstAdDate(); pageBuilder.setTimestamp(c, toTimestamp(fistAdDateSpecified)); } else pageBuilder.setNull(c); break; case "indirectAd2": if (record.isIndirectAd2Specified()) pageBuilder.setString(c, record.getIndirectAd2()); else pageBuilder.setNull(c); break; case "indirectAd3": if (record.isIndirectAd3Specified()) pageBuilder.setString(c, record.getIndirectAd3()); else pageBuilder.setNull(c); break; case "indirectAd4": if (record.isIndirectAd4Specified()) pageBuilder.setString(c, record.getIndirectAd4()); else pageBuilder.setNull(c); break; case "indirectAd5": if (record.isIndirectAd5Specified()) pageBuilder.setString(c, record.getIndirectAd5()); else pageBuilder.setNull(c); break; case "indirectAd6": if (record.isIndirectAd6Specified()) pageBuilder.setString(c, record.getIndirectAd6()); else pageBuilder.setNull(c); break; case "indirectAd7": if (record.isIndirectAd7Specified()) pageBuilder.setString(c, record.getIndirectAd7()); else pageBuilder.setNull(c); break; case "indirectAd8": if (record.isIndirectAd8Specified()) pageBuilder.setString(c, record.getIndirectAd8()); else pageBuilder.setNull(c); break; case "indirectAd9": if (record.isIndirectAd9Specified()) pageBuilder.setString(c, record.getIndirectAd9()); else pageBuilder.setNull(c); break; case "indirectAd10": if (record.isIndirectAd10Specified()) pageBuilder.setString(c, record.getIndirectAd10()); else pageBuilder.setNull(c); break; case "latencyTime": if (record.isLatencyTimeSpecified()) pageBuilder.setLong(c, record.getLatencyTime()); else pageBuilder.setNull(c); break; case "memberName": if (record.isMemberNameSpecified()) pageBuilder.setString(c, record.getMemberName()); else pageBuilder.setNull(c); break; case "other1": if (record.isOther1Specified()) pageBuilder.setString(c, record.getOther1()); else pageBuilder.setNull(c); break; case "other2": if (record.isOther2Specified()) pageBuilder.setString(c, record.getOther2()); else pageBuilder.setNull(c); break; case "other3": if (record.isOther3Specified()) pageBuilder.setString(c, record.getOther3()); else pageBuilder.setNull(c); break; case "other4": if (record.isOther4Specified()) pageBuilder.setString(c, record.getOther4()); else pageBuilder.setNull(c); break; case "other5": if (record.isOther5Specified()) pageBuilder.setString(c, record.getOther5()); else pageBuilder.setNull(c); break; case "terminalType": if (record.isTerminalTypeSpecified()) pageBuilder.setLong(c, record.getTerminalType()); else pageBuilder.setNull(c); break; case "userId": if (record.isUserIdSpecified()) pageBuilder.setString(c, record.getUserId()); else pageBuilder.setNull(c); break; default: break; } if (isExcluded) break; } if (isExcluded) pageBuilder.flush(); else pageBuilder.addRecord(); } } setLastRecordTime(Optional.of(newestRecordTime.format(embulkDateTimeFormat))); } public Schema buildSchema() { return Schema.builder() .add("amount", Types.LONG) .add("conversionDate", Types.TIMESTAMP) .add("conversionId", Types.LONG) .add("directAd", Types.STRING) .add("firstAd", Types.STRING) .add("firstAdDate", Types.TIMESTAMP) .add("indirectAd2", Types.STRING) .add("indirectAd3", Types.STRING) .add("indirectAd4", Types.STRING) .add("indirectAd5", Types.STRING) .add("indirectAd6", Types.STRING) .add("indirectAd7", Types.STRING) .add("indirectAd8", Types.STRING) .add("indirectAd9", Types.STRING) .add("indirectAd10", Types.STRING) .add("latencyTime", Types.LONG) .add("memberName", Types.STRING) .add("other1", Types.STRING) .add("other2", Types.STRING) .add("other3", Types.STRING) .add("other4", Types.STRING) .add("other5", Types.STRING) .add("terminalType", Types.LONG) .add("userId", Types.STRING) .build(); } }