src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.4 vs src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java in embulk-input-zendesk-0.3.5
- old
+ new
@@ -3,14 +3,12 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
-import org.apache.http.HttpStatus;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
@@ -18,18 +16,19 @@
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.exec.GuessExecutor;
import org.embulk.input.zendesk.models.AuthenticationMethod;
import org.embulk.input.zendesk.models.Target;
-import org.embulk.input.zendesk.models.ZendeskException;
+import org.embulk.input.zendesk.services.ZendeskCustomObjectService;
+import org.embulk.input.zendesk.services.ZendeskNPSService;
+import org.embulk.input.zendesk.services.ZendeskService;
import org.embulk.input.zendesk.services.ZendeskSupportAPIService;
+import org.embulk.input.zendesk.services.ZendeskUserEventService;
import org.embulk.input.zendesk.utils.ZendeskConstants;
import org.embulk.input.zendesk.utils.ZendeskDateUtils;
import org.embulk.input.zendesk.utils.ZendeskUtils;
-import org.embulk.input.zendesk.utils.ZendeskValidatorUtils;
import org.embulk.spi.Buffer;
-import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
@@ -43,18 +42,14 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class ZendeskInputPlugin implements InputPlugin
@@ -131,31 +126,58 @@
@Config("app_marketplace_app_id")
@ConfigDefault("null")
Optional<String> getAppMarketPlaceAppId();
+ @Config("object_types")
+ @ConfigDefault("[]")
+ List<String> getObjectTypes();
+
+ @Config("relationship_types")
+ @ConfigDefault("[]")
+ List<String> getRelationshipTypes();
+
+ @Config("profile_source")
+ @ConfigDefault("null")
+ Optional<String> getProfileSource();
+
+ @Config("end_time")
+ @ConfigDefault("null")
+ Optional<String> getEndTime();
+
+ @Config("user_event_type")
+ @ConfigDefault("null")
+ Optional<String> getUserEventType();
+
+ @Config("user_event_source")
+ @ConfigDefault("null")
+ Optional<String> getUserEventSource();
+
@Config("columns")
SchemaConfig getColumns();
}
- private static final Logger logger = Exec.getLogger(ZendeskInputPlugin.class);
+ private ZendeskService zendeskService;
- private ZendeskSupportAPIService zendeskSupportAPIService;
+ private RecordImporter recordImporter;
+ private static final Logger logger = Exec.getLogger(ZendeskInputPlugin.class);
+
@Override
public ConfigDiff transaction(final ConfigSource config, final Control control)
{
final PluginTask task = config.loadConfig(PluginTask.class);
- ZendeskValidatorUtils.validateInputTask(task, getZendeskSupportAPIService(task));
+ validateInputTask(task);
final Schema schema = task.getColumns().toSchema();
int taskCount = 1;
// For non-incremental target, we will split records based on number of pages. 100 records per page
// In preview, run with taskCount = 1
- if (!ZendeskUtils.isSupportAPIIncremental(task.getTarget()) && !Exec.isPreview()) {
- final JsonNode result = getZendeskSupportAPIService(task).getData("", 0, false, 0);
- if (result.has(ZendeskConstants.Field.COUNT) && result.get(ZendeskConstants.Field.COUNT).isInt()) {
+ if (!Exec.isPreview() && !getZendeskService(task).isSupportIncremental() && getZendeskService(task) instanceof ZendeskSupportAPIService) {
+ final JsonNode result = getZendeskService(task).getDataFromPath("", 0, false, 0);
+ if (result.has(ZendeskConstants.Field.COUNT) && !result.get(ZendeskConstants.Field.COUNT).isNull()
+ && result.get(ZendeskConstants.Field.COUNT).isInt()) {
taskCount = ZendeskUtils.numberToSplitWithHintingInTask(result.get(ZendeskConstants.Field.COUNT).asInt());
}
}
return resume(task.dump(), schema, taskCount, control);
}
@@ -176,36 +198,26 @@
@Override
public TaskReport run(final TaskSource taskSource, final Schema schema, final int taskIndex, final PageOutput output)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
try (final PageBuilder pageBuilder = getPageBuilder(schema, output)) {
- final TaskReport taskReport = ingestServiceData(task, taskIndex, schema, pageBuilder);
+ final TaskReport taskReport = getZendeskService(task).addRecordToImporter(taskIndex, getRecordImporter(schema, pageBuilder));
pageBuilder.finish();
return taskReport;
}
}
@Override
public ConfigDiff guess(final ConfigSource config)
{
config.set("columns", new ObjectMapper().createArrayNode());
final PluginTask task = config.loadConfig(PluginTask.class);
- ZendeskValidatorUtils.validateInputTask(task, getZendeskSupportAPIService(task));
+ validateInputTask(task);
return Exec.newConfigDiff().set("columns", buildColumns(task));
}
@VisibleForTesting
- protected ZendeskSupportAPIService getZendeskSupportAPIService(final PluginTask task)
- {
- if (this.zendeskSupportAPIService == null) {
- this.zendeskSupportAPIService = new ZendeskSupportAPIService(task);
- }
- this.zendeskSupportAPIService.setTask(task);
- return this.zendeskSupportAPIService;
- }
-
- @VisibleForTesting
protected PageBuilder getPageBuilder(final Schema schema, final PageOutput output)
{
return new PageBuilder(Exec.getBufferAllocator(), schema, output);
}
@@ -218,157 +230,30 @@
if (taskReport.has(ZendeskConstants.Field.START_TIME)) {
final OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(
taskReport.get(JsonNode.class, ZendeskConstants.Field.START_TIME).asLong()), ZoneOffset.UTC);
configDiff.set(ZendeskConstants.Field.START_TIME,
- offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT)));
+ offsetDateTime.format(DateTimeFormatter.ofPattern(ZendeskConstants.Misc.RUBY_TIMESTAMP_FORMAT_INPUT)));
}
}
return configDiff;
}
- private TaskReport ingestServiceData(final PluginTask task, final int taskIndex,
- final Schema schema, final PageBuilder pageBuilder)
- {
- final TaskReport taskReport = Exec.newTaskReport();
-
- if (ZendeskUtils.isSupportAPIIncremental(task.getTarget())) {
- importDataForIncremental(task, schema, pageBuilder, taskReport);
- }
- else {
- importDataForNonIncremental(task, taskIndex, schema, pageBuilder);
- }
-
- return taskReport;
- }
-
- private void importDataForIncremental(final PluginTask task, final Schema schema,
- final PageBuilder pageBuilder, final TaskReport taskReport)
- {
- long startTime = 0;
-
- if (ZendeskUtils.isSupportAPIIncremental(task.getTarget()) && task.getStartTime().isPresent()) {
- startTime = ZendeskDateUtils.isoToEpochSecond(task.getStartTime().get());
- }
-
- // For incremental target, we will run in one task but split in multiple threads inside for data deduplication.
- // Run with incremental will contain duplicated data.
- ThreadPoolExecutor pool = null;
- try {
- Set<String> knownIds = ConcurrentHashMap.newKeySet();
- pool = new ThreadPoolExecutor(
- 10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
- );
-
- while (true) {
- int recordCount = 0;
-
- // Page argument isn't used in incremental API so we just set it to 0
- final JsonNode result = getZendeskSupportAPIService(task).getData("", 0, false, startTime);
- final Iterator<JsonNode> iterator = getListRecords(result, task.getTarget().getJsonName());
-
- int numberOfRecords = 0;
- if (result.has(ZendeskConstants.Field.COUNT)) {
- numberOfRecords = result.get(ZendeskConstants.Field.COUNT).asInt();
- }
-
- while (iterator.hasNext()) {
- final JsonNode recordJsonNode = iterator.next();
-
- if (isUpdatedBySystem(recordJsonNode, startTime)) {
- continue;
- }
-
- if (task.getDedup()) {
- String recordID = recordJsonNode.get(ZendeskConstants.Field.ID).asText();
- if (knownIds.contains(recordID)) {
- continue;
- }
- knownIds.add(recordID);
- }
-
- pool.submit(() -> fetchData(recordJsonNode, task, schema, pageBuilder));
- recordCount++;
- if (Exec.isPreview()) {
- return;
- }
- }
- logger.info("Fetched '{}' records from start_time '{}'", recordCount, startTime);
-
- if (task.getIncremental()) {
- if (result.has(ZendeskConstants.Field.END_TIME) && !result.get(ZendeskConstants.Field.END_TIME).isNull()
- && result.has(task.getTarget().getJsonName())) {
- // NOTE: start_time compared as "=>", not ">".
- // If we will use end_time for next start_time, we got the same record that is last fetched
- // end_time + 1 is workaround for that
- taskReport.set(ZendeskConstants.Field.START_TIME, result.get(ZendeskConstants.Field.END_TIME).asLong() + 1);
- }
- else {
- // Sometimes no record and no end_time fetched on the job, but we should generate start_time on config_diff.
- taskReport.set(ZendeskConstants.Field.START_TIME, Instant.now().getEpochSecond());
- }
- }
-
- if (numberOfRecords < ZendeskConstants.Misc.MAXIMUM_RECORDS_INCREMENTAL) {
- break;
- }
- else {
- startTime = result.get(ZendeskConstants.Field.END_TIME).asLong();
- }
- }
- }
- finally {
- if (pool != null) {
- pool.shutdown();
- try {
- pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
- catch (final InterruptedException e) {
- logger.warn("Error when wait pool to finish");
- throw Throwables.propagate(e);
- }
- }
- }
- }
-
- private void importDataForNonIncremental(final PluginTask task, final int taskIndex, final Schema schema,
- final PageBuilder pageBuilder)
- {
- // Page start from 1 => page = taskIndex + 1
- final JsonNode result = getZendeskSupportAPIService(task).getData("", taskIndex + 1, false, 0);
- final Iterator<JsonNode> iterator = getListRecords(result, task.getTarget().getJsonName());
-
- while (iterator.hasNext()) {
- fetchData(iterator.next(), task, schema, pageBuilder);
-
- if (Exec.isPreview()) {
- break;
- }
- }
- }
-
- private Iterator<JsonNode> getListRecords(JsonNode result, String targetJsonName)
- {
- if (!result.has(targetJsonName) || !result.get(targetJsonName).isArray()) {
- throw new DataException(String.format("Missing '%s' from Zendesk API response", targetJsonName));
- }
- return result.get(targetJsonName).elements();
- }
-
private JsonNode buildColumns(final PluginTask task)
{
- JsonNode jsonNode = getZendeskSupportAPIService(task).getData("", 0, true, 0);
+ JsonNode jsonNode = getZendeskService(task).getDataFromPath("", 0, true, 0);
String targetName = task.getTarget().getJsonName();
- if (jsonNode.has(targetName) && jsonNode.get(targetName).isArray() && jsonNode.get(targetName).size() > 0) {
+ if (jsonNode.has(targetName) && !jsonNode.get(targetName).isNull() && jsonNode.get(targetName).isArray() && jsonNode.get(targetName).size() > 0) {
return addAllColumnsToSchema(jsonNode, task.getTarget(), task.getIncludes());
}
throw new ConfigException("Could not guess schema due to empty data set");
}
private final Pattern idPattern = Pattern.compile(ZendeskConstants.Regex.ID);
+
private JsonNode addAllColumnsToSchema(final JsonNode jsonNode, final Target target, final List<String> includes)
{
final JsonNode sample = new ObjectMapper().valueToTree(StreamSupport.stream(
jsonNode.get(target.getJsonName()).spliterator(), false).limit(10).collect(Collectors.toList()));
final Buffer bufferSample = Buffer.copyOf(sample.toString().getBytes(StandardCharsets.UTF_8));
@@ -392,10 +277,15 @@
if (type.equals(Types.TIMESTAMP.getName())) {
entry.remove("format");
}
entry.put("type", Types.LONG.getName());
}
+
+ // Id of User Events target is more suitable for String
+ if (target.equals(Target.USER_EVENTS)) {
+ entry.put("type", Types.STRING.getName());
+ }
}
else if (idPattern.matcher(name).find()) {
if (type.equals(Types.TIMESTAMP.getName())) {
entry.remove("format");
}
@@ -415,62 +305,152 @@
.put("name", include)
.put("type", Types.JSON.getName()))
.forEach(arrayNode::add);
}
- private void fetchData(final JsonNode jsonNode, final PluginTask task, final Schema schema,
- final PageBuilder pageBuilder)
+ private ConfigSource createGuessConfig()
{
- // FIXME: if include is not contained in schema, data should be ignore
- task.getIncludes().forEach(include -> {
- String relatedObjectName = include.trim();
- final String url = task.getLoginUrl()
- + "/"
- + ZendeskConstants.Url.API
- + "/" + task.getTarget().toString()
- + "/" + jsonNode.get(ZendeskConstants.Field.ID).asText()
- + "/" + relatedObjectName + ".json";
+ return Exec.newConfigSource()
+ .set("guess_plugins", ImmutableList.of("zendesk"))
+ .set("guess_sample_buffer_bytes", ZendeskConstants.Misc.GUESS_BUFFER_SIZE);
+ }
- try {
- final JsonNode result = getZendeskSupportAPIService(task).getData(url, 0, false, 0);
- if (result != null && result.has(relatedObjectName)) {
- ((ObjectNode) jsonNode).set(include, result.get(relatedObjectName));
+ private ZendeskService getZendeskService(PluginTask task)
+ {
+ if (zendeskService == null) {
+ zendeskService = dispatchPerTarget(task);
+ }
+ return zendeskService;
+ }
+
+ @VisibleForTesting
+ protected ZendeskService dispatchPerTarget(ZendeskInputPlugin.PluginTask task)
+ {
+ switch (task.getTarget()) {
+ case TICKETS:
+ case USERS:
+ case ORGANIZATIONS:
+ case TICKET_METRICS:
+ case TICKET_EVENTS:
+ case TICKET_FORMS:
+ case TICKET_FIELDS:
+ return new ZendeskSupportAPIService(task);
+ case RECIPIENTS:
+ case SCORES:
+ return new ZendeskNPSService(task);
+ case OBJECT_RECORDS:
+ case RELATIONSHIP_RECORDS:
+ return new ZendeskCustomObjectService(task);
+ case USER_EVENTS:
+ return new ZendeskUserEventService(task);
+ default:
+ throw new ConfigException("Unsupported " + task.getTarget() + ", supported values: '" + Arrays.toString(Target.values()) + "'");
+ }
+ }
+
+ private RecordImporter getRecordImporter(Schema schema, PageBuilder pageBuilder)
+ {
+ if (recordImporter == null) {
+ recordImporter = new RecordImporter(schema, pageBuilder);
+ }
+ return recordImporter;
+ }
+
+ private void validateInputTask(PluginTask task)
+ {
+ validateAppMarketPlace(task.getAppMarketPlaceIntegrationName().isPresent(),
+ task.getAppMarketPlaceAppId().isPresent(),
+ task.getAppMarketPlaceOrgId().isPresent());
+ validateCredentials(task);
+ validateIncremental(task);
+ validateCustomObject(task);
+ validateUserEvent(task);
+ }
+
+ private void validateCredentials(PluginTask task)
+ {
+ switch (task.getAuthenticationMethod()) {
+ case OAUTH:
+ if (!task.getAccessToken().isPresent()) {
+ throw new ConfigException(String.format("access_token is required for authentication method '%s'",
+ task.getAuthenticationMethod().name().toLowerCase()));
}
- }
- catch (final ConfigException e) {
- // Sometimes we get 404 when having invalid endpoint, so ignore when we get 404 InvalidEndpoint
- if (!(e.getCause() instanceof ZendeskException && ((ZendeskException) e.getCause()).getStatusCode() == HttpStatus.SC_NOT_FOUND)) {
- throw e;
+ break;
+ case TOKEN:
+ if (!task.getUsername().isPresent() || !task.getToken().isPresent()) {
+ throw new ConfigException(String.format("username and token are required for authentication method '%s'",
+ task.getAuthenticationMethod().name().toLowerCase()));
}
- }
- });
+ break;
+ case BASIC:
+ if (!task.getUsername().isPresent() || !task.getPassword().isPresent()) {
+ throw new ConfigException(String.format("username and password are required for authentication method '%s'",
+ task.getAuthenticationMethod().name().toLowerCase()));
+ }
+ break;
+ default:
+ throw new ConfigException("Unknown authentication method");
+ }
+ }
- ZendeskUtils.addRecord(jsonNode, schema, pageBuilder);
+ private void validateAppMarketPlace(final boolean isAppMarketIntegrationNamePresent,
+ final boolean isAppMarketAppIdPresent,
+ final boolean isAppMarketOrgIdPresent)
+ {
+ final boolean isAllAvailable =
+ isAppMarketIntegrationNamePresent && isAppMarketAppIdPresent && isAppMarketOrgIdPresent;
+ final boolean isAllUnAvailable =
+ !isAppMarketIntegrationNamePresent && !isAppMarketAppIdPresent && !isAppMarketOrgIdPresent;
+ // All or nothing needed
+ if (!(isAllAvailable || isAllUnAvailable)) {
+ throw new ConfigException("All of app_marketplace_integration_name, app_marketplace_org_id, " +
+ "app_marketplace_app_id " +
+ "are required to fill out for Apps Marketplace API header");
+ }
}
- private ConfigSource createGuessConfig()
+ private void validateIncremental(PluginTask task)
{
- return Exec.newConfigSource()
- .set("guess_plugins", ImmutableList.of("zendesk"))
- .set("guess_sample_buffer_bytes", ZendeskConstants.Misc.GUESS_BUFFER_SIZE);
+ if (task.getIncremental() && getZendeskService(task).isSupportIncremental()) {
+ if (!task.getDedup()) {
+ logger.warn("You've selected to skip de-duplicating records, result may contain duplicated data");
+ }
+
+ if (!getZendeskService(task).isSupportIncremental() && task.getStartTime().isPresent()) {
+ logger.warn(String.format("Target: '%s' doesn't support incremental export API. Will be ignored start_time option",
+ task.getTarget()));
+ }
+ }
}
- private boolean isUpdatedBySystem(JsonNode recordJsonNode, long startTime)
+ private void validateCustomObject(PluginTask task)
{
- /*
- * https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates
- * "generated_timestamp" will be updated when Zendesk internal changing
- * "updated_at" will be updated when ticket data was changed
- * start_time for query parameter will be processed on Zendesk with generated_timestamp,
- * but it was calculated by record' updated_at time.
- * So the doesn't changed record from previous import would be appear by Zendesk internal changes.
- * We ignore record that has updated_at <= start_time
- */
- if (recordJsonNode.has(ZendeskConstants.Field.GENERATED_TIMESTAMP) && recordJsonNode.has(ZendeskConstants.Field.UPDATED_AT)) {
- String recordUpdatedAtTime = recordJsonNode.get(ZendeskConstants.Field.UPDATED_AT).asText();
- long recordUpdatedAtToEpochSecond = ZendeskDateUtils.isoToEpochSecond(recordUpdatedAtTime);
- return recordUpdatedAtToEpochSecond <= startTime;
+ if (task.getTarget().equals(Target.OBJECT_RECORDS) && task.getObjectTypes().isEmpty()) {
+ throw new ConfigException("Should have at least one Object Type");
}
- return false;
+ if (task.getTarget().equals(Target.RELATIONSHIP_RECORDS) && task.getRelationshipTypes().isEmpty()) {
+ throw new ConfigException("Should have at least one Relationship Type");
+ }
+ }
+
+ private void validateUserEvent(PluginTask task)
+ {
+ if (task.getTarget().equals(Target.USER_EVENTS)) {
+ if (!task.getProfileSource().isPresent()) {
+ throw new ConfigException("Profile Source is required for User Event Target");
+ }
+
+ // Can't set end_time to 0, so it should be valid
+ task.getEndTime().ifPresent(time -> {
+ if (!ZendeskDateUtils.supportedTimeFormat(task.getEndTime().get()).isPresent()) {
+ throw new ConfigException("End Time should follow these format " + ZendeskConstants.Misc.SUPPORT_DATE_TIME_FORMAT.toString());
+ }
+ });
+
+ if (task.getStartTime().isPresent() && task.getEndTime().isPresent()
+ && ZendeskDateUtils.getStartTime(task.getStartTime().get()) > ZendeskDateUtils.isoToEpochSecond(task.getEndTime().get())) {
+ throw new ConfigException("End Time should be later or equal than Start Time");
+ }
+ }
}
}