lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.1.13 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.6

- old
+ new

@@ -1,32 +1,47 @@ require "strscan" -require "thread" require "httpclient" +require 'concurrent' module Embulk module Input module Zendesk class Client attr_reader :config PARTIAL_RECORDS_SIZE = 50 PARTIAL_RECORDS_BYTE_SIZE = 50000 - AVAILABLE_INCREMENTAL_EXPORT = %w(tickets users organizations ticket_events).freeze - UNAVAILABLE_INCREMENTAL_EXPORT = %w(ticket_fields ticket_forms ticket_metrics).freeze + AVAILABLE_INCREMENTAL_EXPORT = %w(tickets users organizations ticket_events ticket_metrics).freeze + UNAVAILABLE_INCREMENTAL_EXPORT = %w(ticket_fields ticket_forms).freeze AVAILABLE_TARGETS = AVAILABLE_INCREMENTAL_EXPORT + UNAVAILABLE_INCREMENTAL_EXPORT def initialize(config) @config = config end def httpclient - httpclient = HTTPClient.new - httpclient.connect_timeout = 240 # default:60 is not enough for huge data - # httpclient.debug_dev = STDOUT - return set_auth(httpclient) + # multi-threading + retry can create lot of instances, and each will keep connecting + # re-using instance in multi threads can help to omit cleanup code + @httpclient ||= + begin + clnt = HTTPClient.new + clnt.connect_timeout = 240 # default:60 is not enough for huge data + clnt.receive_timeout = 240 # better change default receive_timeout too + # httpclient.debug_dev = STDOUT + set_auth(clnt) + end end + def get_pool + Concurrent::ThreadPoolExecutor.new( + min_threads: 10, + max_threads: 100, + max_queue: 10_000, + fallback_policy: :caller_runs + ) + end + def validate_config validate_credentials validate_target end @@ -67,18 +82,36 @@ path = "/api/v2/incremental/#{target}" incremental_export(path, target, start_time, [], partial, &block) end end + # Ticket metrics will need to be export using both the non incremental and incremental on ticket + # We provide support by filter out ticket_metrics with created at smaller than start time + # while passing the incremental start time to the incremental ticket/ticket_metrics export + %w(ticket_metrics).each do |target| + define_method(target) do |partial = true, start_time = 0, &block| + path = "/api/v2/incremental/tickets.json" + if partial + path = "/api/v2/#{target}.json" + # If partial export then we need to use the old end point. Since new end point return both ticket and + # ticket metric with ticket come first so the current approach that cut off the response packet won't work + # Since partial is only use for preview and guess so this should be fine + export(path, target, &block) + else + incremental_export(path, "metric_sets", start_time, [], partial,{include: "metric_sets"}, &block) + end + end + end + # they have non-incremental API only UNAVAILABLE_INCREMENTAL_EXPORT.each do |target| define_method(target) do |partial = true, start_time = 0, &block| path = "/api/v2/#{target}.json" if partial export(path, target, &block) else - export_parallel(path, target, &block) + export_parallel(path, target, start_time, &block) end end end def fetch_subresource(record_id, base, target) @@ -94,54 +127,38 @@ end end private - def export_parallel(path, key, workers = 5, &block) + def export_parallel(path, key, start_time = 0, &block) per_page = 100 # 100 is maximum https://developer.zendesk.com/rest_api/docs/core/introduction#pagination first_response = request(path, per_page: per_page, page: 1) first_fetched = JSON.parse(first_response.body) total_count = first_fetched["count"] last_page_num = (total_count / per_page.to_f).ceil Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}" - queue = Queue.new - (2..last_page_num).each do |page| - queue << page + first_fetched[key].uniq { |r| r['id'] }.each do |record| + block.call record + # known_ticket_ids: collect fetched ticket IDs, to exclude in next step end - records = first_fetched[key] - mutex = Mutex.new - threads = workers.times.map do |n| - Thread.start do - loop do - break if queue.empty? - current_page = nil - - begin - Timeout.timeout(0.1) do - # Somehow queue.pop(true) blocks... timeout is workaround for that - current_page = queue.pop(true) - end - rescue Timeout::Error, ThreadError => e - break #=> ThreadError: queue empty - end - - response = request(path, per_page: per_page, page: current_page) - fetched_records = extract_records_from_response(response, key) - mutex.synchronize do - Embulk.logger.info "Fetched #{key} on page=#{current_page}" - records.concat fetched_records - end + pool = get_pool + (2..last_page_num).each do |page| + pool.post do + response = request(path, per_page: per_page, page: page) + fetched_records = extract_records_from_response(response, key) + Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}" + fetched_records.uniq { |r| r['id'] }.each do |record| + block.call record end end end - threads.each(&:join) - records.uniq {|r| r["id"]}.each do |record| - block.call record - end + pool.shutdown + pool.wait_for_termination + nil # this is necessary different with incremental_export end def export(path, key, page = 1, &block) per_page = PARTIAL_RECORDS_SIZE @@ -158,22 +175,23 @@ data[key].each do |record| block.call record end end - def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, &block) + def incremental_export(path, key, start_time = 0, known_ids = [], partial = true,query = {}, &block) if partial - records = request_partial(path, {start_time: start_time}).first(5) + records = request_partial(path, query.merge({start_time: start_time})).first(5) records.uniq{|r| r["id"]}.each do |record| block.call record end return end - loop do + pool = get_pool + last_data = loop do start_fetching = Time.now - response = request(path, {start_time: start_time}) + response = request(path, query.merge({start_time: start_time})) begin data = JSON.parse(response.body) rescue => e raise Embulk::DataError.new(e) end @@ -196,21 +214,25 @@ # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes # https://github.com/zendesk/zendesk_api_client_rb/issues/251 next if known_ids.include?(record["id"]) known_ids << record["id"] - block.call record + pool.post { yield(record) } actual_fetched += 1 end Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds" start_time = data["end_time"] # NOTE: If count is less than 1000, then stop paginating. # Otherwise, use the next_page URL to get the next page of results. # https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination break data if data["count"] < 1000 end + + pool.shutdown + pool.wait_for_termination + last_data end def extract_records_from_response(response, key) begin data = JSON.parse(response.body) @@ -362,9 +384,10 @@ end else raise "Server returns unknown status code (#{status_code}) #{body}" end end + end end end end