lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.6 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.7
- old
+ new
@@ -29,11 +29,11 @@
# httpclient.debug_dev = STDOUT
set_auth(clnt)
end
end
- def get_pool
+ def create_pool
Concurrent::ThreadPoolExecutor.new(
min_threads: 10,
max_threads: 100,
max_queue: 10_000,
fallback_policy: :caller_runs
@@ -67,40 +67,29 @@
raise Embulk::ConfigError.new("target: '#{config[:target]}' is not supported. Supported targets are #{AVAILABLE_TARGETS.join(", ")}.")
end
end
# they have both Incremental API and non-incremental API
- %w(tickets users organizations).each do |target|
+ # 170717: `ticket_events` can use standard endpoint format now, ie. `<target>.json`
+ %w(tickets ticket_events users organizations).each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
# Always use incremental_export. There is some difference between incremental_export and export.
incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], partial, &block)
end
end
- # they have incremental API only
- %w(ticket_events).each do |target|
- define_method(target) do |partial = true, start_time = 0, &block|
- path = "/api/v2/incremental/#{target}"
- incremental_export(path, target, start_time, [], partial, &block)
- end
- end
-
# Ticket metrics will need to be export using both the non incremental and incremental on ticket
# We provide support by filter out ticket_metrics with created at smaller than start time
# while passing the incremental start time to the incremental ticket/ticket_metrics export
- %w(ticket_metrics).each do |target|
- define_method(target) do |partial = true, start_time = 0, &block|
- path = "/api/v2/incremental/tickets.json"
- if partial
- path = "/api/v2/#{target}.json"
- # If partial export then we need to use the old end point. Since new end point return both ticket and
- # ticket metric with ticket come first so the current approach that cut off the response packet won't work
- # Since partial is only use for preview and guess so this should be fine
- export(path, target, &block)
- else
- incremental_export(path, "metric_sets", start_time, [], partial,{include: "metric_sets"}, &block)
- end
+ define_method('ticket_metrics') do |partial = true, start_time = 0, &block|
+ if partial
+ # If partial export then we need to use the old end point. Since new end point return both ticket and
+ # ticket metric with ticket come first so the current approach that cut off the response packet won't work
+ # Since partial is only use for preview and guess so this should be fine
+ export('/api/v2/ticket_metrics.json', 'ticket_metrics', &block)
+ else
+ incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, [], partial, { include: 'metric_sets' }, &block)
end
end
# they have non-incremental API only
UNAVAILABLE_INCREMENTAL_EXPORT.each do |target|
@@ -137,28 +126,25 @@
last_page_num = (total_count / per_page.to_f).ceil
Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}"
first_fetched[key].uniq { |r| r['id'] }.each do |record|
block.call record
- # known_ticket_ids: collect fetched ticket IDs, to exclude in next step
end
- pool = get_pool
- (2..last_page_num).each do |page|
- pool.post do
- response = request(path, per_page: per_page, page: page)
- fetched_records = extract_records_from_response(response, key)
- Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
- fetched_records.uniq { |r| r['id'] }.each do |record|
- block.call record
+ execute_thread_pool do |pool|
+ (2..last_page_num).each do |page|
+ pool.post do
+ response = request(path, per_page: per_page, page: page)
+ fetched_records = extract_records_from_response(response, key)
+ Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
+ fetched_records.uniq { |r| r['id'] }.each do |record|
+ block.call record
+ end
end
end
end
- pool.shutdown
- pool.wait_for_termination
-
nil # this is necessary different with incremental_export
end
def export(path, key, page = 1, &block)
per_page = PARTIAL_RECORDS_SIZE
@@ -175,64 +161,59 @@
data[key].each do |record|
block.call record
end
end
- def incremental_export(path, key, start_time = 0, known_ids = [], partial = true,query = {}, &block)
+ def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, query = {}, &block)
+ query.merge!(start_time: start_time)
if partial
- records = request_partial(path, query.merge({start_time: start_time})).first(5)
+ records = request_partial(path, query).first(5)
records.uniq{|r| r["id"]}.each do |record|
block.call record
end
return
end
- pool = get_pool
- last_data = loop do
- start_fetching = Time.now
- response = request(path, query.merge({start_time: start_time}))
- begin
+ execute_thread_pool do |pool|
+ loop do
+ start_fetching = Time.now
+ response = request(path, query)
+ actual_fetched = 0
data = JSON.parse(response.body)
- rescue => e
- raise Embulk::DataError.new(e)
- end
- actual_fetched = 0
- records = data[key]
- records.each do |record|
- # https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates
- # "generated_timestamp" will be updated when Zendesk internal changing
- # "updated_at" will be updated when ticket data was changed
- # start_time for query parameter will be processed on Zendesk with generated_timestamp,
- # but it was calculated by record' updated_at time.
- # So the doesn't changed record from previous import would be appear by Zendesk internal changes.
- # We ignore record that has updated_at <= start_time
- if start_time && record["generated_timestamp"] && record["updated_at"]
- updated_at = Time.parse(record["updated_at"])
- next if updated_at <= Time.at(start_time)
- end
+ # no key found in response occasionally => retry
+ raise TempError, "No '#{key}' found in JSON response" unless data.key? key
+ data[key].each do |record|
+ # https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates
+ # "generated_timestamp" will be updated when Zendesk internal changing
+ # "updated_at" will be updated when ticket data was changed
+ # start_time for query parameter will be processed on Zendesk with generated_timestamp,
+ # but it was calculated by record' updated_at time.
+ # So the doesn't changed record from previous import would be appear by Zendesk internal changes.
+ # We ignore record that has updated_at <= start_time
+ if start_time && record["generated_timestamp"] && record["updated_at"]
+ updated_at = Time.parse(record["updated_at"])
+ next if updated_at <= Time.at(start_time)
+ end
- # de-duplicated records.
- # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
- # https://github.com/zendesk/zendesk_api_client_rb/issues/251
- next if known_ids.include?(record["id"])
+ # de-duplicated records.
+ # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
+ # https://github.com/zendesk/zendesk_api_client_rb/issues/251
+ next if known_ids.include?(record["id"])
- known_ids << record["id"]
- pool.post { yield(record) }
- actual_fetched += 1
- end
- Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds"
- start_time = data["end_time"]
+ known_ids << record["id"]
+ pool.post { block.call record }
+ actual_fetched += 1
+ end
+ Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds"
+ start_time = data["end_time"]
- # NOTE: If count is less than 1000, then stop paginating.
- # Otherwise, use the next_page URL to get the next page of results.
- # https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination
- break data if data["count"] < 1000
+ # NOTE: If count is less than 1000, then stop paginating.
+ # Otherwise, use the next_page URL to get the next page of results.
+ # https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination
+ break data if data["count"] < 1000
+ end
end
-
- pool.shutdown
- pool.wait_for_termination
- last_data
end
def extract_records_from_response(response, key)
begin
data = JSON.parse(response.body)
@@ -385,9 +366,30 @@
else
raise "Server returns unknown status code (#{status_code}) #{body}"
end
end
+ def execute_thread_pool(&block)
+ pool = create_pool
+ pr = PerfectRetry.new do |config|
+ config.limit = @config[:retry_limit]
+ config.logger = Embulk.logger
+ config.log_level = nil
+ config.rescues = [TempError]
+ config.sleep = lambda{|n| @config[:retry_initial_wait_sec]* (2 ** (n-1)) }
+ end
+ pr.with_retry { block.call(pool) }
+ rescue => e
+ raise Embulk::DataError.new(e)
+ ensure
+ Embulk.logger.info 'ThreadPool shutting down...'
+ pool.shutdown
+ pool.wait_for_termination
+ Embulk.logger.info "ThreadPool shutdown? #{pool.shutdown?}"
+ end
+ end
+
+ class TempError < StandardError
end
end
end
end