src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.4 vs src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.5

- old
+ new

@@ -3,14 +3,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import org.apache.http.HttpStatus; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; @@ -18,18 +16,19 @@ import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.exec.GuessExecutor; import org.embulk.input.zendesk.models.AuthenticationMethod; import org.embulk.input.zendesk.models.Target; -import org.embulk.input.zendesk.models.ZendeskException; +import org.embulk.input.zendesk.services.ZendeskCustomObjectService; +import org.embulk.input.zendesk.services.ZendeskNPSService; +import org.embulk.input.zendesk.services.ZendeskService; import org.embulk.input.zendesk.services.ZendeskSupportAPIService; +import org.embulk.input.zendesk.services.ZendeskUserEventService; import org.embulk.input.zendesk.utils.ZendeskConstants; import org.embulk.input.zendesk.utils.ZendeskDateUtils; import org.embulk.input.zendesk.utils.ZendeskUtils; -import org.embulk.input.zendesk.utils.ZendeskValidatorUtils; import org.embulk.spi.Buffer; -import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.InputPlugin; import org.embulk.spi.PageBuilder; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; @@ -43,18 +42,14 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; public class ZendeskInputPlugin implements InputPlugin @@ -131,31 +126,58 @@ @Config("app_marketplace_app_id") @ConfigDefault("null") Optional<String> getAppMarketPlaceAppId(); + @Config("object_types") + @ConfigDefault("[]") + List<String> getObjectTypes(); + + @Config("relationship_types") + @ConfigDefault("[]") + List<String> getRelationshipTypes(); + + @Config("profile_source") + @ConfigDefault("null") + Optional<String> getProfileSource(); + + @Config("end_time") + @ConfigDefault("null") + Optional<String> getEndTime(); + + @Config("user_event_type") + @ConfigDefault("null") + Optional<String> getUserEventType(); + + @Config("user_event_source") + @ConfigDefault("null") + Optional<String> getUserEventSource(); + @Config("columns") SchemaConfig getColumns(); } - private static final Logger logger = Exec.getLogger(ZendeskInputPlugin.class); + private ZendeskService zendeskService; - private ZendeskSupportAPIService zendeskSupportAPIService; + private RecordImporter recordImporter; + private static final Logger logger = Exec.getLogger(ZendeskInputPlugin.class); + @Override public ConfigDiff transaction(final ConfigSource config, final Control control) { final PluginTask task = config.loadConfig(PluginTask.class); - ZendeskValidatorUtils.validateInputTask(task, getZendeskSupportAPIService(task)); + validateInputTask(task); final Schema schema = task.getColumns().toSchema(); int taskCount = 1; // For non-incremental target, we will split records based on number of pages. 100 records per page // In preview, run with taskCount = 1 - if (!ZendeskUtils.isSupportAPIIncremental(task.getTarget()) && !Exec.isPreview()) { - final JsonNode result = getZendeskSupportAPIService(task).getData("", 0, false, 0); - if (result.has(ZendeskConstants.Field.COUNT) && result.get(ZendeskConstants.Field.COUNT).isInt()) { + if (!Exec.isPreview() && !getZendeskService(task).isSupportIncremental() && getZendeskService(task) instanceof ZendeskSupportAPIService) { + final JsonNode result = getZendeskService(task).getDataFromPath("", 0, false, 0); + if (result.has(ZendeskConstants.Field.COUNT) && !result.get(ZendeskConstants.Field.COUNT).isNull() + && result.get(ZendeskConstants.Field.COUNT).isInt()) { taskCount = ZendeskUtils.numberToSplitWithHintingInTask(result.get(ZendeskConstants.Field.COUNT).asInt()); } } return resume(task.dump(), schema, taskCount, control); } @@ -176,36 +198,26 @@ @Override public TaskReport run(final TaskSource taskSource, final Schema schema, final int taskIndex, final PageOutput output) { final PluginTask task = taskSource.loadTask(PluginTask.class); try (final PageBuilder pageBuilder = getPageBuilder(schema, output)) { - final TaskReport taskReport = ingestServiceData(task, taskIndex, schema, pageBuilder); + final TaskReport taskReport = getZendeskService(task).addRecordToImporter(taskIndex, getRecordImporter(schema, pageBuilder)); pageBuilder.finish(); return taskReport; } } @Override public ConfigDiff guess(final ConfigSource config) { config.set("columns", new ObjectMapper().createArrayNode()); final PluginTask task = config.loadConfig(PluginTask.class); - ZendeskValidatorUtils.validateInputTask(task, getZendeskSupportAPIService(task)); + validateInputTask(task); return Exec.newConfigDiff().set("columns", buildColumns(task)); } @VisibleForTesting - protected ZendeskSupportAPIService getZendeskSupportAPIService(final PluginTask task) - { - if (this.zendeskSupportAPIService == null) { - this.zendeskSupportAPIService = new ZendeskSupportAPIService(task); - } - this.zendeskSupportAPIService.setTask(task); - return this.zendeskSupportAPIService; - } - - @VisibleForTesting protected PageBuilder getPageBuilder(final Schema schema, final PageOutput output) { return new PageBuilder(Exec.getBufferAllocator(), schema, output); } @@ -218,157 +230,30 @@ if (taskReport.has(ZendeskConstants.Field.START_TIME)) { final OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond( taskReport.get(JsonNode.class, ZendeskConstants.Field.START_TIME).asLong()), ZoneOffset.UTC); configDiff.set(ZendeskConstants.Field.START_TIME, - offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT))); + offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT))); } } return configDiff; } - private TaskReport ingestServiceData(final PluginTask task, final int taskIndex, - final Schema schema, final PageBuilder pageBuilder) - { - final TaskReport taskReport = Exec.newTaskReport(); - - if (ZendeskUtils.isSupportAPIIncremental(task.getTarget())) { - importDataForIncremental(task, schema, pageBuilder, taskReport); - } - else { - importDataForNonIncremental(task, taskIndex, schema, pageBuilder); - } - - return taskReport; - } - - private void importDataForIncremental(final PluginTask task, final Schema schema, - final PageBuilder pageBuilder, final TaskReport taskReport) - { - long startTime = 0; - - if (ZendeskUtils.isSupportAPIIncremental(task.getTarget()) && task.getStartTime().isPresent()) { - startTime = ZendeskDateUtils.isoToEpochSecond(task.getStartTime().get()); - } - - // For incremental target, we will run in one task but split in multiple threads inside for data deduplication. - // Run with incremental will contain duplicated data. - ThreadPoolExecutor pool = null; - try { - Set<String> knownIds = ConcurrentHashMap.newKeySet(); - pool = new ThreadPoolExecutor( - 10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>() - ); - - while (true) { - int recordCount = 0; - - // Page argument isn't used in incremental API so we just set it to 0 - final JsonNode result = getZendeskSupportAPIService(task).getData("", 0, false, startTime); - final Iterator<JsonNode> iterator = getListRecords(result, task.getTarget().getJsonName()); - - int numberOfRecords = 0; - if (result.has(ZendeskConstants.Field.COUNT)) { - numberOfRecords = result.get(ZendeskConstants.Field.COUNT).asInt(); - } - - while (iterator.hasNext()) { - final JsonNode recordJsonNode = iterator.next(); - - if (isUpdatedBySystem(recordJsonNode, startTime)) { - continue; - } - - if (task.getDedup()) { - String recordID = recordJsonNode.get(ZendeskConstants.Field.ID).asText(); - if (knownIds.contains(recordID)) { - continue; - } - knownIds.add(recordID); - } - - pool.submit(() -> fetchData(recordJsonNode, task, schema, pageBuilder)); - recordCount++; - if (Exec.isPreview()) { - return; - } - } - logger.info("Fetched '{}' records from start_time '{}'", recordCount, startTime); - - if (task.getIncremental()) { - if (result.has(ZendeskConstants.Field.END_TIME) && !result.get(ZendeskConstants.Field.END_TIME).isNull() - && result.has(task.getTarget().getJsonName())) { - // NOTE: start_time compared as "=>", not ">". - // If we will use end_time for next start_time, we got the same record that is last fetched - // end_time + 1 is workaround for that - taskReport.set(ZendeskConstants.Field.START_TIME, result.get(ZendeskConstants.Field.END_TIME).asLong() + 1); - } - else { - // Sometimes no record and no end_time fetched on the job, but we should generate start_time on config_diff. - taskReport.set(ZendeskConstants.Field.START_TIME, Instant.now().getEpochSecond()); - } - } - - if (numberOfRecords < ZendeskConstants.Misc.MAXIMUM_RECORDS_INCREMENTAL) { - break; - } - else { - startTime = result.get(ZendeskConstants.Field.END_TIME).asLong(); - } - } - } - finally { - if (pool != null) { - pool.shutdown(); - try { - pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } - catch (final InterruptedException e) { - logger.warn("Error when wait pool to finish"); - throw Throwables.propagate(e); - } - } - } - } - - private void importDataForNonIncremental(final PluginTask task, final int taskIndex, final Schema schema, - final PageBuilder pageBuilder) - { - // Page start from 1 => page = taskIndex + 1 - final JsonNode result = getZendeskSupportAPIService(task).getData("", taskIndex + 1, false, 0); - final Iterator<JsonNode> iterator = getListRecords(result, task.getTarget().getJsonName()); - - while (iterator.hasNext()) { - fetchData(iterator.next(), task, schema, pageBuilder); - - if (Exec.isPreview()) { - break; - } - } - } - - private Iterator<JsonNode> getListRecords(JsonNode result, String targetJsonName) - { - if (!result.has(targetJsonName) || !result.get(targetJsonName).isArray()) { - throw new DataException(String.format("Missing '%s' from Zendesk API response", targetJsonName)); - } - return result.get(targetJsonName).elements(); - } - private JsonNode buildColumns(final PluginTask task) { - JsonNode jsonNode = getZendeskSupportAPIService(task).getData("", 0, true, 0); + JsonNode jsonNode = getZendeskService(task).getDataFromPath("", 0, true, 0); String targetName = task.getTarget().getJsonName(); - if (jsonNode.has(targetName) && jsonNode.get(targetName).isArray() && jsonNode.get(targetName).size() > 0) { + if (jsonNode.has(targetName) && !jsonNode.get(targetName).isNull() && jsonNode.get(targetName).isArray() && jsonNode.get(targetName).size() > 0) { return addAllColumnsToSchema(jsonNode, task.getTarget(), task.getIncludes()); } throw new ConfigException("Could not guess schema due to empty data set"); } private final Pattern idPattern = Pattern.compile(ZendeskConstants.Regex.ID); + private JsonNode addAllColumnsToSchema(final JsonNode jsonNode, final Target target, final List<String> includes) { final JsonNode sample = new ObjectMapper().valueToTree(StreamSupport.stream( jsonNode.get(target.getJsonName()).spliterator(), false).limit(10).collect(Collectors.toList())); final Buffer bufferSample = Buffer.copyOf(sample.toString().getBytes(StandardCharsets.UTF_8)); @@ -392,10 +277,15 @@ if (type.equals(Types.TIMESTAMP.getName())) { entry.remove("format"); } entry.put("type", Types.LONG.getName()); } + + // Id of User Events target is more suitable for String + if (target.equals(Target.USER_EVENTS)) { + entry.put("type", Types.STRING.getName()); + } } else if (idPattern.matcher(name).find()) { if (type.equals(Types.TIMESTAMP.getName())) { entry.remove("format"); } @@ -415,62 +305,152 @@ .put("name", include) .put("type", Types.JSON.getName())) .forEach(arrayNode::add); } - private void fetchData(final JsonNode jsonNode, final PluginTask task, final Schema schema, - final PageBuilder pageBuilder) + private ConfigSource createGuessConfig() { - // FIXME: if include is not contained in schema, data should be ignore - task.getIncludes().forEach(include -> { - String relatedObjectName = include.trim(); - final String url = task.getLoginUrl() - + "/" - + ZendeskConstants.Url.API - + "/" + task.getTarget().toString() - + "/" + jsonNode.get(ZendeskConstants.Field.ID).asText() - + "/" + relatedObjectName + ".json"; + return Exec.newConfigSource() + .set("guess_plugins", ImmutableList.of("zendesk")) + .set("guess_sample_buffer_bytes", ZendeskConstants.Misc.GUESS_BUFFER_SIZE); + } - try { - final JsonNode result = getZendeskSupportAPIService(task).getData(url, 0, false, 0); - if (result != null && result.has(relatedObjectName)) { - ((ObjectNode) jsonNode).set(include, result.get(relatedObjectName)); + private ZendeskService getZendeskService(PluginTask task) + { + if (zendeskService == null) { + zendeskService = dispatchPerTarget(task); + } + return zendeskService; + } + + @VisibleForTesting + protected ZendeskService dispatchPerTarget(ZendeskInputPlugin.PluginTask task) + { + switch (task.getTarget()) { + case TICKETS: + case USERS: + case ORGANIZATIONS: + case TICKET_METRICS: + case TICKET_EVENTS: + case TICKET_FORMS: + case TICKET_FIELDS: + return new ZendeskSupportAPIService(task); + case RECIPIENTS: + case SCORES: + return new ZendeskNPSService(task); + case OBJECT_RECORDS: + case RELATIONSHIP_RECORDS: + return new ZendeskCustomObjectService(task); + case USER_EVENTS: + return new ZendeskUserEventService(task); + default: + throw new ConfigException("Unsupported " + task.getTarget() + ", supported values: '" + Arrays.toString(Target.values()) + "'"); + } + } + + private RecordImporter getRecordImporter(Schema schema, PageBuilder pageBuilder) + { + if (recordImporter == null) { + recordImporter = new RecordImporter(schema, pageBuilder); + } + return recordImporter; + } + + private void validateInputTask(PluginTask task) + { + validateAppMarketPlace(task.getAppMarketPlaceIntegrationName().isPresent(), + task.getAppMarketPlaceAppId().isPresent(), + task.getAppMarketPlaceOrgId().isPresent()); + validateCredentials(task); + validateIncremental(task); + validateCustomObject(task); + validateUserEvent(task); + } + + private void validateCredentials(PluginTask task) + { + switch (task.getAuthenticationMethod()) { + case OAUTH: + if (!task.getAccessToken().isPresent()) { + throw new ConfigException(String.format("access_token is required for authentication method '%s'", + task.getAuthenticationMethod().name().toLowerCase())); } - } - catch (final ConfigException e) { - // Sometimes we get 404 when having invalid endpoint, so ignore when we get 404 InvalidEndpoint - if (!(e.getCause() instanceof ZendeskException && ((ZendeskException) e.getCause()).getStatusCode() == HttpStatus.SC_NOT_FOUND)) { - throw e; + break; + case TOKEN: + if (!task.getUsername().isPresent() || !task.getToken().isPresent()) { + throw new ConfigException(String.format("username and token are required for authentication method '%s'", + task.getAuthenticationMethod().name().toLowerCase())); } - } - }); + break; + case BASIC: + if (!task.getUsername().isPresent() || !task.getPassword().isPresent()) { + throw new ConfigException(String.format("username and password are required for authentication method '%s'", + task.getAuthenticationMethod().name().toLowerCase())); + } + break; + default: + throw new ConfigException("Unknown authentication method"); + } + } - ZendeskUtils.addRecord(jsonNode, schema, pageBuilder); + private void validateAppMarketPlace(final boolean isAppMarketIntegrationNamePresent, + final boolean isAppMarketAppIdPresent, + final boolean isAppMarketOrgIdPresent) + { + final boolean isAllAvailable = + isAppMarketIntegrationNamePresent && isAppMarketAppIdPresent && isAppMarketOrgIdPresent; + final boolean isAllUnAvailable = + !isAppMarketIntegrationNamePresent && !isAppMarketAppIdPresent && !isAppMarketOrgIdPresent; + // All or nothing needed + if (!(isAllAvailable || isAllUnAvailable)) { + throw new ConfigException("All of app_marketplace_integration_name, app_marketplace_org_id, " + + "app_marketplace_app_id " + + "are required to fill out for Apps Marketplace API header"); + } } - private ConfigSource createGuessConfig() + private void validateIncremental(PluginTask task) { - return Exec.newConfigSource() - .set("guess_plugins", ImmutableList.of("zendesk")) - .set("guess_sample_buffer_bytes", ZendeskConstants.Misc.GUESS_BUFFER_SIZE); + if (task.getIncremental() && getZendeskService(task).isSupportIncremental()) { + if (!task.getDedup()) { + logger.warn("You've selected to skip de-duplicating records, result may contain duplicated data"); + } + + if (!getZendeskService(task).isSupportIncremental() && task.getStartTime().isPresent()) { + logger.warn(String.format("Target: '%s' doesn't support incremental export API. Will be ignored start_time option", + task.getTarget())); + } + } } - private boolean isUpdatedBySystem(JsonNode recordJsonNode, long startTime) + private void validateCustomObject(PluginTask task) { - /* - * https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates - * "generated_timestamp" will be updated when Zendesk internal changing - * "updated_at" will be updated when ticket data was changed - * start_time for query parameter will be processed on Zendesk with generated_timestamp, - * but it was calculated by record' updated_at time. - * So the doesn't changed record from previous import would be appear by Zendesk internal changes. - * We ignore record that has updated_at <= start_time - */ - if (recordJsonNode.has(ZendeskConstants.Field.GENERATED_TIMESTAMP) && recordJsonNode.has(ZendeskConstants.Field.UPDATED_AT)) { - String recordUpdatedAtTime = recordJsonNode.get(ZendeskConstants.Field.UPDATED_AT).asText(); - long recordUpdatedAtToEpochSecond = ZendeskDateUtils.isoToEpochSecond(recordUpdatedAtTime); - return recordUpdatedAtToEpochSecond <= startTime; + if (task.getTarget().equals(Target.OBJECT_RECORDS) && task.getObjectTypes().isEmpty()) { + throw new ConfigException("Should have at least one Object Type"); } - return false; + if (task.getTarget().equals(Target.RELATIONSHIP_RECORDS) && task.getRelationshipTypes().isEmpty()) { + throw new ConfigException("Should have at least one Relationship Type"); + } + } + + private void validateUserEvent(PluginTask task) + { + if (task.getTarget().equals(Target.USER_EVENTS)) { + if (!task.getProfileSource().isPresent()) { + throw new ConfigException("Profile Source is required for User Event Target"); + } + + // Can't set end_time to 0, so it should be valid + task.getEndTime().ifPresent(time -> { + if (!ZendeskDateUtils.supportedTimeFormat(task.getEndTime().get()).isPresent()) { + throw new ConfigException("End Time should follow these format " + ZendeskConstants.Misc.SUPPORT_DATE_TIME_FORMAT.toString()); + } + }); + + if (task.getStartTime().isPresent() && task.getEndTime().isPresent() + && ZendeskDateUtils.getStartTime(task.getStartTime().get()) > ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get())) { + throw new ConfigException("End Time should be later or equal than Start Time"); + } + } } }