src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.5 vs src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.6
- old
+ new
@@ -165,10 +165,11 @@
@Override
public ConfigDiff transaction(final ConfigSource config, final Control control)
{
final PluginTask task = config.loadConfig(PluginTask.class);
validateInputTask(task);
+
final Schema schema = task.getColumns().toSchema();
int taskCount = 1;
// For non-incremental target, we will split records based on number of pages. 100 records per page
// In preview, run with taskCount = 1
@@ -197,10 +198,25 @@
@Override
public TaskReport run(final TaskSource taskSource, final Schema schema, final int taskIndex, final PageOutput output)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
+
+ if (getZendeskService(task).isSupportIncremental() && !isValidTimeRange(task)) {
+ if (Exec.isPreview()) {
+ throw new ConfigException("Invalid End time. End time is greater than current time");
+ }
+
+ logger.warn("The end time, '" + task.getEndTime().get() + "', is greater than the current time. No records will be imported");
+
+ // we just need to store config_diff when incremental_mode is enable
+ if (task.getIncremental()) {
+ return buildTaskReportKeepOldStartAndEndTime(task);
+ }
+ return Exec.newTaskReport();
+ }
+
try (final PageBuilder pageBuilder = getPageBuilder(schema, output)) {
final TaskReport taskReport = getZendeskService(task).addRecordToImporter(taskIndex, getRecordImporter(schema, pageBuilder));
pageBuilder.finish();
return taskReport;
}
@@ -210,10 +226,13 @@
public ConfigDiff guess(final ConfigSource config)
{
config.set("columns", new ObjectMapper().createArrayNode());
final PluginTask task = config.loadConfig(PluginTask.class);
validateInputTask(task);
+ if (!isValidTimeRange(task)) {
+ throw new ConfigException("Invalid End time. End time is greater than current time");
+ }
return Exec.newConfigDiff().set("columns", buildColumns(task));
}
@VisibleForTesting
protected PageBuilder getPageBuilder(final Schema schema, final PageOutput output)
@@ -232,10 +251,18 @@
taskReport.get(JsonNode.class, ZendeskConstants.Field.START_TIME).asLong()), ZoneOffset.UTC);
configDiff.set(ZendeskConstants.Field.START_TIME,
offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT)));
}
+
+ if (taskReport.has(ZendeskConstants.Field.END_TIME)) {
+ final OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(
+ taskReport.get(JsonNode.class, ZendeskConstants.Field.END_TIME).asLong()), ZoneOffset.UTC);
+
+ configDiff.set(ZendeskConstants.Field.END_TIME,
+ offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT)));
+ }
}
return configDiff;
}
private JsonNode buildColumns(final PluginTask task)
@@ -362,10 +389,11 @@
task.getAppMarketPlaceOrgId().isPresent());
validateCredentials(task);
validateIncremental(task);
validateCustomObject(task);
validateUserEvent(task);
+ validateTime(task);
}
private void validateCredentials(PluginTask task)
{
switch (task.getAuthenticationMethod()) {
@@ -437,11 +465,16 @@
{
if (task.getTarget().equals(Target.USER_EVENTS)) {
if (!task.getProfileSource().isPresent()) {
throw new ConfigException("Profile Source is required for User Event Target");
}
+ }
+ }
+ private void validateTime(PluginTask task)
+ {
+ if (getZendeskService(task).isSupportIncremental()) {
// Can't set end_time to 0, so it should be valid
task.getEndTime().ifPresent(time -> {
if (!ZendeskDateUtils.supportedTimeFormat(task.getEndTime().get()).isPresent()) {
throw new ConfigException("End Time should follow these format " + ZendeskConstants.Misc.SUPPORT_DATE_TIME_FORMAT.toString());
}
@@ -450,7 +483,27 @@
if (task.getStartTime().isPresent() && task.getEndTime().isPresent()
&& ZendeskDateUtils.getStartTime(task.getStartTime().get()) > ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get())) {
throw new ConfigException("End Time should be later or equal than Start Time");
}
}
+ }
+
+ private boolean isValidTimeRange(PluginTask task)
+ {
+ return !task.getEndTime().isPresent() || ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get()) <= Instant.now().getEpochSecond();
+ }
+
+ private TaskReport buildTaskReportKeepOldStartAndEndTime(PluginTask task)
+ {
+ final TaskReport taskReport = Exec.newTaskReport();
+
+ if (task.getStartTime().isPresent()) {
+ taskReport.set(ZendeskConstants.Field.START_TIME, ZendeskDateUtils.isoToEpochSecond(task.getStartTime().get()));
+ }
+
+ if (task.getEndTime().isPresent()) {
+ taskReport.set(ZendeskConstants.Field.END_TIME, ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get()));
+ }
+
+ return taskReport;
}
}