lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.12 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.13
- old
+ new
@@ -80,27 +80,27 @@
end
# they have both Incremental API and non-incremental API
# 170717: `ticket_events` can use standard endpoint format now, ie. `<target>.json`
%w(tickets ticket_events users organizations).each do |target|
- define_method(target) do |partial = true, start_time = 0, &block|
+ define_method(target) do |partial = true, start_time = 0, dedup = true, &block|
# Always use incremental_export. There is some difference between incremental_export and export.
- incremental_export("/api/v2/incremental/#{target}.json", target, start_time, Set.new, partial, &block)
+ incremental_export("/api/v2/incremental/#{target}.json", target, start_time, dedup, Set.new, partial, &block)
end
end
# Ticket metrics will need to be export using both the non incremental and incremental on ticket
# We provide support by filter out ticket_metrics with created at smaller than start time
# while passing the incremental start time to the incremental ticket/ticket_metrics export
- define_method('ticket_metrics') do |partial = true, start_time = 0, &block|
+ define_method('ticket_metrics') do |partial = true, start_time = 0, dedup = true, &block|
if partial
# If partial export then we need to use the old end point. Since new end point return both ticket and
# ticket metric with ticket come first so the current approach that cut off the response packet won't work
# Since partial is only use for preview and guess so this should be fine
export('/api/v2/ticket_metrics.json', 'ticket_metrics', &block)
else
- incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, Set.new, partial, { include: 'metric_sets' }, &block)
+ incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, dedup, Set.new, partial, { include: 'metric_sets' }, &block)
end
end
# they have non-incremental API only
UNAVAILABLE_INCREMENTAL_EXPORT.each do |target|
@@ -173,19 +173,23 @@
data[key].each do |record|
block.call record
end
end
- def incremental_export(path, key, start_time = 0, known_ids = Set.new, partial = true, query = {}, &block)
+ def incremental_export(path, key, start_time = 0, dedup = true, known_ids = Set.new, partial = true, query = {}, &block)
if partial
records = request_partial(path, query.merge(start_time: start_time)).first(5)
records.uniq{|r| r["id"]}.each do |record|
block.call record
end
return
end
+ if !dedup
+ Embulk.logger.warn("!!! You've selected to skip de-duplicating records, result may contain duplicated data !!!")
+ end
+
execute_thread_pool do |pool|
loop do
start_fetching = Time.now
response = request(path, false, query.merge(start_time: start_time))
actual_fetched = 0
@@ -206,12 +210,14 @@
end
# de-duplicated records.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
# https://github.com/zendesk/zendesk_api_client_rb/issues/251
- next if known_ids.include?(record["id"])
+ if dedup
+ next if known_ids.include?(record["id"])
+ known_ids << record["id"]
+ end
- known_ids << record["id"]
pool.post { block.call record }
actual_fetched += 1
end
Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds"
start_time = data["end_time"]