src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.5 vs src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.6

- old
+ new

@@ -165,10 +165,11 @@ @Override public ConfigDiff transaction(final ConfigSource config, final Control control) { final PluginTask task = config.loadConfig(PluginTask.class); validateInputTask(task); + final Schema schema = task.getColumns().toSchema(); int taskCount = 1; // For non-incremental target, we will split records based on number of pages. 100 records per page // In preview, run with taskCount = 1 @@ -197,10 +198,25 @@ @Override public TaskReport run(final TaskSource taskSource, final Schema schema, final int taskIndex, final PageOutput output) { final PluginTask task = taskSource.loadTask(PluginTask.class); + + if (getZendeskService(task).isSupportIncremental() && !isValidTimeRange(task)) { + if (Exec.isPreview()) { + throw new ConfigException("Invalid End time. End time is greater than current time"); + } + + logger.warn("The end time, '" + task.getEndTime().get() + "', is greater than the current time. No records will be imported"); + + // we just need to store config_diff when incremental_mode is enable + if (task.getIncremental()) { + return buildTaskReportKeepOldStartAndEndTime(task); + } + return Exec.newTaskReport(); + } + try (final PageBuilder pageBuilder = getPageBuilder(schema, output)) { final TaskReport taskReport = getZendeskService(task).addRecordToImporter(taskIndex, getRecordImporter(schema, pageBuilder)); pageBuilder.finish(); return taskReport; } @@ -210,10 +226,13 @@ public ConfigDiff guess(final ConfigSource config) { config.set("columns", new ObjectMapper().createArrayNode()); final PluginTask task = config.loadConfig(PluginTask.class); validateInputTask(task); + if (!isValidTimeRange(task)) { + throw new ConfigException("Invalid End time. End time is greater than current time"); + } return Exec.newConfigDiff().set("columns", buildColumns(task)); } @VisibleForTesting protected PageBuilder getPageBuilder(final Schema schema, final PageOutput output) @@ -232,10 +251,18 @@ taskReport.get(JsonNode.class, ZendeskConstants.Field.START_TIME).asLong()), ZoneOffset.UTC); configDiff.set(ZendeskConstants.Field.START_TIME, offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT))); } + + if (taskReport.has(ZendeskConstants.Field.END_TIME)) { + final OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond( + taskReport.get(JsonNode.class, ZendeskConstants.Field.END_TIME).asLong()), ZoneOffset.UTC); + + configDiff.set(ZendeskConstants.Field.END_TIME, + offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT))); + } } return configDiff; } private JsonNode buildColumns(final PluginTask task) @@ -362,10 +389,11 @@ task.getAppMarketPlaceOrgId().isPresent()); validateCredentials(task); validateIncremental(task); validateCustomObject(task); validateUserEvent(task); + validateTime(task); } private void validateCredentials(PluginTask task) { switch (task.getAuthenticationMethod()) { @@ -437,11 +465,16 @@ { if (task.getTarget().equals(Target.USER_EVENTS)) { if (!task.getProfileSource().isPresent()) { throw new ConfigException("Profile Source is required for User Event Target"); } + } + } + private void validateTime(PluginTask task) + { + if (getZendeskService(task).isSupportIncremental()) { // Can't set end_time to 0, so it should be valid task.getEndTime().ifPresent(time -> { if (!ZendeskDateUtils.supportedTimeFormat(task.getEndTime().get()).isPresent()) { throw new ConfigException("End Time should follow these format " + ZendeskConstants.Misc.SUPPORT_DATE_TIME_FORMAT.toString()); } @@ -450,7 +483,27 @@ if (task.getStartTime().isPresent() && task.getEndTime().isPresent() && ZendeskDateUtils.getStartTime(task.getStartTime().get()) > ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get())) { throw new ConfigException("End Time should be later or equal than Start Time"); } } + } + + private boolean isValidTimeRange(PluginTask task) + { + return !task.getEndTime().isPresent() || ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get()) <= Instant.now().getEpochSecond(); + } + + private TaskReport buildTaskReportKeepOldStartAndEndTime(PluginTask task) + { + final TaskReport taskReport = Exec.newTaskReport(); + + if (task.getStartTime().isPresent()) { + taskReport.set(ZendeskConstants.Field.START_TIME, ZendeskDateUtils.isoToEpochSecond(task.getStartTime().get())); + } + + if (task.getEndTime().isPresent()) { + taskReport.set(ZendeskConstants.Field.END_TIME, ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get())); + } + + return taskReport; } }