package org.embulk.input.adebis.operation; import com.google.common.base.Optional; import jp.ne.ebis.extreme.ws.entity.xsd.AdConversionAttributeCondition; import jp.ne.ebis.extreme.ws.entity.xsd.HoteiAuthHeader; import jp.ne.ebis.extreme.ws.entity.xsd.HoteiAuthHeaderE; import jp.ne.ebis.extreme.ws.service.AdService; import jp.ne.ebis.extreme.ws.service.AdServiceStub; import jp.ne.ebis.extreme.ws.service.GetAdConversionAttributes; import jp.ne.ebis.extreme.ws.service.GetAdConversionAttributesResponse; import org.apache.axis2.AxisFault; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.Task; import org.embulk.spi.Column; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; import org.embulk.spi.type.Types; import java.rmi.RemoteException; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; public class AdConversionAttribute extends AbstractOperation { public interface Condition extends Task { @Config("start_date") @ConfigDefault("null") public Optional getStartDate(); @Config("end_date") @ConfigDefault("null") public Optional getEndDate(); @Config("ad_group1_ids") @ConfigDefault("null") public Optional> getAdGroup1Ids(); @Config("ad_group2_ids") @ConfigDefault("null") public Optional> getAdGroup2Ids(); @Config("media_ids") @ConfigDefault("null") public Optional> getMediaIds(); @Config("ad_ids") @ConfigDefault("null") public Optional> getAdIds(); @Config("terminal_types") @ConfigDefault("null") public Optional> getTerminalTypes(); @Config("keyword") @ConfigDefault("null") public Optional getKeyword(); @Config("conversion_ids") @ConfigDefault("null") public Optional> getConversionIds(); } public void run(PageBuilder pageBuilder) { Schema schema = pageBuilder.getSchema(); AdService adService; try { adService = new AdServiceStub(); } catch (AxisFault axisFault) { axisFault.printStackTrace(); return; } LocalDate startDate = LocalDate.parse(condition.getStartDate().get(), adebisDateFormat); LocalDate endDate = LocalDate.parse(condition.getEndDate().get(), adebisDateFormat); LocalDateTime lastRecordTime, newestRecordTime; if(getLastRecordTime().isPresent()) lastRecordTime = LocalDateTime.parse(getLastRecordTime().get(), embulkDateTimeFormat); else lastRecordTime = startDate.atStartOfDay(); if(lastRecordTime.isAfter(startDate.atStartOfDay())){ startDate = lastRecordTime.toLocalDate(); } GetAdConversionAttributes params = new GetAdConversionAttributes(); AdConversionAttributeCondition cond = new AdConversionAttributeCondition(); // Build Header HoteiAuthHeader header = new HoteiAuthHeader(); HoteiAuthHeaderE headerE = new HoteiAuthHeaderE(); header.setLogId(getLogId()); header.setLogArgument(getLogArgument()); header.setApiKey(getApiKey()); headerE.setHoteiAuthHeader(header); // Build condition cond.setStartDate(startDate.format(adebisDateFormat)); cond.setEndDate(endDate.format(adebisDateFormat)); // Build parameter params.setAdConversionAttributeCondition(cond); int offset = 0, limit = FETCH_LIMIT_AT_ONCE; params.setLimit(limit); GetAdConversionAttributesResponse response; newestRecordTime = lastRecordTime; while(true) { params.setOffset(offset); try { response = adService.getAdConversionAttributes(params, headerE); } catch (RemoteException e) { e.printStackTrace(); return; } catch (NullPointerException e) { return; } jp.ne.ebis.extreme.ws.entity.xsd.AdConversionAttribute[] results = response.get_return(); if (results != null) offset += limit; else break; for (jp.ne.ebis.extreme.ws.entity.xsd.AdConversionAttribute record : results) { Boolean isExcluded = false; for (Column c : schema.getColumns()) { switch (c.getName()) { case "createdAt": long unixtime = System.currentTimeMillis() / 1000L; org.embulk.spi.time.Timestamp timestamp = org.embulk.spi.time.Timestamp.ofEpochSecond(unixtime); pageBuilder.setTimestamp(c, timestamp); break; case "amount": if (record.isAmountSpecified()) pageBuilder.setLong(c, record.getAmount()); else pageBuilder.setNull(c); break; case "conversionDate": if (record.isConversionDateSpecified()) { String conversionDate = record.getConversionDate(); LocalDateTime recordTime = LocalDateTime.parse(conversionDate, adebisDateTimeFormat); if (recordTime.isAfter(lastRecordTime)) { pageBuilder.setTimestamp(c, toTimestamp(conversionDate)); if (recordTime.isAfter(newestRecordTime)) newestRecordTime = recordTime; } else { isExcluded = true; } } else pageBuilder.setNull(c); break; case "conversionId": if (record.isConversionIdSpecified()) pageBuilder.setLong(c, record.getConversionId()); else pageBuilder.setNull(c); break; case "directAd": if (record.isDirectAdSpecified()) pageBuilder.setString(c, record.getDirectAd()); else pageBuilder.setNull(c); break; case "firstAd": if (record.isFirstAdSpecified()) pageBuilder.setString(c, record.getFirstAd()); else pageBuilder.setNull(c); break; case "firstAdDate": if (record.isFirstAdDateSpecified()) { String fistAdDateSpecified = record.getFirstAdDate(); pageBuilder.setTimestamp(c, toTimestamp(fistAdDateSpecified)); } else pageBuilder.setNull(c); break; case "indirectAd2": if (record.isIndirectAd2Specified()) pageBuilder.setString(c, record.getIndirectAd2()); else pageBuilder.setNull(c); break; case "indirectAd3": if (record.isIndirectAd3Specified()) pageBuilder.setString(c, record.getIndirectAd3()); else pageBuilder.setNull(c); break; case "indirectAd4": if (record.isIndirectAd4Specified()) pageBuilder.setString(c, record.getIndirectAd4()); else pageBuilder.setNull(c); break; case "indirectAd5": if (record.isIndirectAd5Specified()) pageBuilder.setString(c, record.getIndirectAd5()); else pageBuilder.setNull(c); break; case "indirectAd6": if (record.isIndirectAd6Specified()) pageBuilder.setString(c, record.getIndirectAd6()); else pageBuilder.setNull(c); break; case "indirectAd7": if (record.isIndirectAd7Specified()) pageBuilder.setString(c, record.getIndirectAd7()); else pageBuilder.setNull(c); break; case "indirectAd8": if (record.isIndirectAd8Specified()) pageBuilder.setString(c, record.getIndirectAd8()); else pageBuilder.setNull(c); break; case "indirectAd9": if (record.isIndirectAd9Specified()) pageBuilder.setString(c, record.getIndirectAd9()); else pageBuilder.setNull(c); break; case "indirectAd10": if (record.isIndirectAd10Specified()) pageBuilder.setString(c, record.getIndirectAd10()); else pageBuilder.setNull(c); break; case "latencyTime": if (record.isLatencyTimeSpecified()) pageBuilder.setLong(c, record.getLatencyTime()); else pageBuilder.setNull(c); break; case "memberName": if (record.isMemberNameSpecified()) pageBuilder.setString(c, record.getMemberName()); else pageBuilder.setNull(c); break; case "other1": if (record.isOther1Specified()) pageBuilder.setString(c, record.getOther1()); else pageBuilder.setNull(c); break; case "other2": if (record.isOther2Specified()) pageBuilder.setString(c, record.getOther2()); else pageBuilder.setNull(c); break; case "other3": if (record.isOther3Specified()) pageBuilder.setString(c, record.getOther3()); else pageBuilder.setNull(c); break; case "other4": if (record.isOther4Specified()) pageBuilder.setString(c, record.getOther4()); else pageBuilder.setNull(c); break; case "other5": if (record.isOther5Specified()) pageBuilder.setString(c, record.getOther5()); else pageBuilder.setNull(c); break; case "terminalType": if (record.isTerminalTypeSpecified()) pageBuilder.setLong(c, record.getTerminalType()); else pageBuilder.setNull(c); break; case "userId": if (record.isUserIdSpecified()) pageBuilder.setString(c, record.getUserId()); else pageBuilder.setNull(c); break; default: break; } if (isExcluded) break; } if (isExcluded) pageBuilder.flush(); else pageBuilder.addRecord(); } } setLastRecordTime(Optional.of(newestRecordTime.format(embulkDateTimeFormat))); } public Schema buildSchema() { return Schema.builder() .add("createdAt", Types.TIMESTAMP) .add("amount", Types.LONG) .add("conversionDate", Types.TIMESTAMP) .add("conversionId", Types.LONG) .add("directAd", Types.STRING) .add("firstAd", Types.STRING) .add("firstAdDate", Types.TIMESTAMP) .add("indirectAd2", Types.STRING) .add("indirectAd3", Types.STRING) .add("indirectAd4", Types.STRING) .add("indirectAd5", Types.STRING) .add("indirectAd6", Types.STRING) .add("indirectAd7", Types.STRING) .add("indirectAd8", Types.STRING) .add("indirectAd9", Types.STRING) .add("indirectAd10", Types.STRING) .add("latencyTime", Types.LONG) .add("memberName", Types.STRING) .add("other1", Types.STRING) .add("other2", Types.STRING) .add("other3", Types.STRING) .add("other4", Types.STRING) .add("other5", Types.STRING) .add("terminalType", Types.LONG) .add("userId", Types.STRING) .build(); } }