lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.6 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.7

- old
+ new

@@ -29,11 +29,11 @@ # httpclient.debug_dev = STDOUT set_auth(clnt) end end - def get_pool + def create_pool Concurrent::ThreadPoolExecutor.new( min_threads: 10, max_threads: 100, max_queue: 10_000, fallback_policy: :caller_runs @@ -67,40 +67,29 @@ raise Embulk::ConfigError.new("target: '#{config[:target]}' is not supported. Supported targets are #{AVAILABLE_TARGETS.join(", ")}.") end end # they have both Incremental API and non-incremental API - %w(tickets users organizations).each do |target| + # 170717: `ticket_events` can use standard endpoint format now, ie. `<target>.json` + %w(tickets ticket_events users organizations).each do |target| define_method(target) do |partial = true, start_time = 0, &block| # Always use incremental_export. There is some difference between incremental_export and export. incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], partial, &block) end end - # they have incremental API only - %w(ticket_events).each do |target| - define_method(target) do |partial = true, start_time = 0, &block| - path = "/api/v2/incremental/#{target}" - incremental_export(path, target, start_time, [], partial, &block) - end - end - # Ticket metrics will need to be export using both the non incremental and incremental on ticket # We provide support by filter out ticket_metrics with created at smaller than start time # while passing the incremental start time to the incremental ticket/ticket_metrics export - %w(ticket_metrics).each do |target| - define_method(target) do |partial = true, start_time = 0, &block| - path = "/api/v2/incremental/tickets.json" - if partial - path = "/api/v2/#{target}.json" - # If partial export then we need to use the old end point. Since new end point return both ticket and - # ticket metric with ticket come first so the current approach that cut off the response packet won't work - # Since partial is only use for preview and guess so this should be fine - export(path, target, &block) - else - incremental_export(path, "metric_sets", start_time, [], partial,{include: "metric_sets"}, &block) - end + define_method('ticket_metrics') do |partial = true, start_time = 0, &block| + if partial + # If partial export then we need to use the old end point. Since new end point return both ticket and + # ticket metric with ticket come first so the current approach that cut off the response packet won't work + # Since partial is only use for preview and guess so this should be fine + export('/api/v2/ticket_metrics.json', 'ticket_metrics', &block) + else + incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, [], partial, { include: 'metric_sets' }, &block) end end # they have non-incremental API only UNAVAILABLE_INCREMENTAL_EXPORT.each do |target| @@ -137,28 +126,25 @@ last_page_num = (total_count / per_page.to_f).ceil Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}" first_fetched[key].uniq { |r| r['id'] }.each do |record| block.call record - # known_ticket_ids: collect fetched ticket IDs, to exclude in next step end - pool = get_pool - (2..last_page_num).each do |page| - pool.post do - response = request(path, per_page: per_page, page: page) - fetched_records = extract_records_from_response(response, key) - Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}" - fetched_records.uniq { |r| r['id'] }.each do |record| - block.call record + execute_thread_pool do |pool| + (2..last_page_num).each do |page| + pool.post do + response = request(path, per_page: per_page, page: page) + fetched_records = extract_records_from_response(response, key) + Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}" + fetched_records.uniq { |r| r['id'] }.each do |record| + block.call record + end end end end - pool.shutdown - pool.wait_for_termination - nil # this is necessary different with incremental_export end def export(path, key, page = 1, &block) per_page = PARTIAL_RECORDS_SIZE @@ -175,64 +161,59 @@ data[key].each do |record| block.call record end end - def incremental_export(path, key, start_time = 0, known_ids = [], partial = true,query = {}, &block) + def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, query = {}, &block) + query.merge!(start_time: start_time) if partial - records = request_partial(path, query.merge({start_time: start_time})).first(5) + records = request_partial(path, query).first(5) records.uniq{|r| r["id"]}.each do |record| block.call record end return end - pool = get_pool - last_data = loop do - start_fetching = Time.now - response = request(path, query.merge({start_time: start_time})) - begin + execute_thread_pool do |pool| + loop do + start_fetching = Time.now + response = request(path, query) + actual_fetched = 0 data = JSON.parse(response.body) - rescue => e - raise Embulk::DataError.new(e) - end - actual_fetched = 0 - records = data[key] - records.each do |record| - # https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates - # "generated_timestamp" will be updated when Zendesk internal changing - # "updated_at" will be updated when ticket data was changed - # start_time for query parameter will be processed on Zendesk with generated_timestamp, - # but it was calculated by record' updated_at time. - # So the doesn't changed record from previous import would be appear by Zendesk internal changes. - # We ignore record that has updated_at <= start_time - if start_time && record["generated_timestamp"] && record["updated_at"] - updated_at = Time.parse(record["updated_at"]) - next if updated_at <= Time.at(start_time) - end + # no key found in response occasionally => retry + raise TempError, "No '#{key}' found in JSON response" unless data.key? key + data[key].each do |record| + # https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates + # "generated_timestamp" will be updated when Zendesk internal changing + # "updated_at" will be updated when ticket data was changed + # start_time for query parameter will be processed on Zendesk with generated_timestamp, + # but it was calculated by record' updated_at time. + # So the doesn't changed record from previous import would be appear by Zendesk internal changes. + # We ignore record that has updated_at <= start_time + if start_time && record["generated_timestamp"] && record["updated_at"] + updated_at = Time.parse(record["updated_at"]) + next if updated_at <= Time.at(start_time) + end - # de-duplicated records. - # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes - # https://github.com/zendesk/zendesk_api_client_rb/issues/251 - next if known_ids.include?(record["id"]) + # de-duplicated records. + # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes + # https://github.com/zendesk/zendesk_api_client_rb/issues/251 + next if known_ids.include?(record["id"]) - known_ids << record["id"] - pool.post { yield(record) } - actual_fetched += 1 - end - Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds" - start_time = data["end_time"] + known_ids << record["id"] + pool.post { block.call record } + actual_fetched += 1 + end + Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds" + start_time = data["end_time"] - # NOTE: If count is less than 1000, then stop paginating. - # Otherwise, use the next_page URL to get the next page of results. - # https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination - break data if data["count"] < 1000 + # NOTE: If count is less than 1000, then stop paginating. + # Otherwise, use the next_page URL to get the next page of results. + # https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination + break data if data["count"] < 1000 + end end - - pool.shutdown - pool.wait_for_termination - last_data end def extract_records_from_response(response, key) begin data = JSON.parse(response.body) @@ -385,9 +366,30 @@ else raise "Server returns unknown status code (#{status_code}) #{body}" end end + def execute_thread_pool(&block) + pool = create_pool + pr = PerfectRetry.new do |config| + config.limit = @config[:retry_limit] + config.logger = Embulk.logger + config.log_level = nil + config.rescues = [TempError] + config.sleep = lambda{|n| @config[:retry_initial_wait_sec]* (2 ** (n-1)) } + end + pr.with_retry { block.call(pool) } + rescue => e + raise Embulk::DataError.new(e) + ensure + Embulk.logger.info 'ThreadPool shutting down...' + pool.shutdown + pool.wait_for_termination + Embulk.logger.info "ThreadPool shutdown? #{pool.shutdown?}" + end + end + + class TempError < StandardError end end end end