lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.1.13 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.6
- old
+ new
@@ -1,32 +1,47 @@
require "strscan"
-require "thread"
require "httpclient"
+require 'concurrent'
module Embulk
module Input
module Zendesk
class Client
attr_reader :config
PARTIAL_RECORDS_SIZE = 50
PARTIAL_RECORDS_BYTE_SIZE = 50000
- AVAILABLE_INCREMENTAL_EXPORT = %w(tickets users organizations ticket_events).freeze
- UNAVAILABLE_INCREMENTAL_EXPORT = %w(ticket_fields ticket_forms ticket_metrics).freeze
+ AVAILABLE_INCREMENTAL_EXPORT = %w(tickets users organizations ticket_events ticket_metrics).freeze
+ UNAVAILABLE_INCREMENTAL_EXPORT = %w(ticket_fields ticket_forms).freeze
AVAILABLE_TARGETS = AVAILABLE_INCREMENTAL_EXPORT + UNAVAILABLE_INCREMENTAL_EXPORT
def initialize(config)
@config = config
end
def httpclient
- httpclient = HTTPClient.new
- httpclient.connect_timeout = 240 # default:60 is not enough for huge data
- # httpclient.debug_dev = STDOUT
- return set_auth(httpclient)
+ # multi-threading + retry can create lot of instances, and each will keep connecting
+ # re-using instance in multi threads can help to omit cleanup code
+ @httpclient ||=
+ begin
+ clnt = HTTPClient.new
+ clnt.connect_timeout = 240 # default:60 is not enough for huge data
+ clnt.receive_timeout = 240 # better change default receive_timeout too
+ # httpclient.debug_dev = STDOUT
+ set_auth(clnt)
+ end
end
+ def get_pool
+ Concurrent::ThreadPoolExecutor.new(
+ min_threads: 10,
+ max_threads: 100,
+ max_queue: 10_000,
+ fallback_policy: :caller_runs
+ )
+ end
+
def validate_config
validate_credentials
validate_target
end
@@ -67,18 +82,36 @@
path = "/api/v2/incremental/#{target}"
incremental_export(path, target, start_time, [], partial, &block)
end
end
+ # Ticket metrics will need to be export using both the non incremental and incremental on ticket
+ # We provide support by filter out ticket_metrics with created at smaller than start time
+ # while passing the incremental start time to the incremental ticket/ticket_metrics export
+ %w(ticket_metrics).each do |target|
+ define_method(target) do |partial = true, start_time = 0, &block|
+ path = "/api/v2/incremental/tickets.json"
+ if partial
+ path = "/api/v2/#{target}.json"
+ # If partial export then we need to use the old end point. Since new end point return both ticket and
+ # ticket metric with ticket come first so the current approach that cut off the response packet won't work
+ # Since partial is only use for preview and guess so this should be fine
+ export(path, target, &block)
+ else
+ incremental_export(path, "metric_sets", start_time, [], partial,{include: "metric_sets"}, &block)
+ end
+ end
+ end
+
# they have non-incremental API only
UNAVAILABLE_INCREMENTAL_EXPORT.each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
path = "/api/v2/#{target}.json"
if partial
export(path, target, &block)
else
- export_parallel(path, target, &block)
+ export_parallel(path, target, start_time, &block)
end
end
end
def fetch_subresource(record_id, base, target)
@@ -94,54 +127,38 @@
end
end
private
- def export_parallel(path, key, workers = 5, &block)
+ def export_parallel(path, key, start_time = 0, &block)
per_page = 100 # 100 is maximum https://developer.zendesk.com/rest_api/docs/core/introduction#pagination
first_response = request(path, per_page: per_page, page: 1)
first_fetched = JSON.parse(first_response.body)
total_count = first_fetched["count"]
last_page_num = (total_count / per_page.to_f).ceil
Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}"
- queue = Queue.new
- (2..last_page_num).each do |page|
- queue << page
+ first_fetched[key].uniq { |r| r['id'] }.each do |record|
+ block.call record
+ # known_ticket_ids: collect fetched ticket IDs, to exclude in next step
end
- records = first_fetched[key]
- mutex = Mutex.new
- threads = workers.times.map do |n|
- Thread.start do
- loop do
- break if queue.empty?
- current_page = nil
-
- begin
- Timeout.timeout(0.1) do
- # Somehow queue.pop(true) blocks... timeout is workaround for that
- current_page = queue.pop(true)
- end
- rescue Timeout::Error, ThreadError => e
- break #=> ThreadError: queue empty
- end
-
- response = request(path, per_page: per_page, page: current_page)
- fetched_records = extract_records_from_response(response, key)
- mutex.synchronize do
- Embulk.logger.info "Fetched #{key} on page=#{current_page}"
- records.concat fetched_records
- end
+ pool = get_pool
+ (2..last_page_num).each do |page|
+ pool.post do
+ response = request(path, per_page: per_page, page: page)
+ fetched_records = extract_records_from_response(response, key)
+ Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
+ fetched_records.uniq { |r| r['id'] }.each do |record|
+ block.call record
end
end
end
- threads.each(&:join)
- records.uniq {|r| r["id"]}.each do |record|
- block.call record
- end
+ pool.shutdown
+ pool.wait_for_termination
+
nil # this is necessary different with incremental_export
end
def export(path, key, page = 1, &block)
per_page = PARTIAL_RECORDS_SIZE
@@ -158,22 +175,23 @@
data[key].each do |record|
block.call record
end
end
- def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, &block)
+ def incremental_export(path, key, start_time = 0, known_ids = [], partial = true,query = {}, &block)
if partial
- records = request_partial(path, {start_time: start_time}).first(5)
+ records = request_partial(path, query.merge({start_time: start_time})).first(5)
records.uniq{|r| r["id"]}.each do |record|
block.call record
end
return
end
- loop do
+ pool = get_pool
+ last_data = loop do
start_fetching = Time.now
- response = request(path, {start_time: start_time})
+ response = request(path, query.merge({start_time: start_time}))
begin
data = JSON.parse(response.body)
rescue => e
raise Embulk::DataError.new(e)
end
@@ -196,21 +214,25 @@
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
# https://github.com/zendesk/zendesk_api_client_rb/issues/251
next if known_ids.include?(record["id"])
known_ids << record["id"]
- block.call record
+ pool.post { yield(record) }
actual_fetched += 1
end
Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds"
start_time = data["end_time"]
# NOTE: If count is less than 1000, then stop paginating.
# Otherwise, use the next_page URL to get the next page of results.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination
break data if data["count"] < 1000
end
+
+ pool.shutdown
+ pool.wait_for_termination
+ last_data
end
def extract_records_from_response(response, key)
begin
data = JSON.parse(response.body)
@@ -362,9 +384,10 @@
end
else
raise "Server returns unknown status code (#{status_code}) #{body}"
end
end
+
end
end
end
end