lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.1.3 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.1.4

- old
+ new

@@ -1,14 +1,16 @@ +require "strscan" require "httpclient" module Embulk module Input module Zendesk class Client attr_reader :config PARTIAL_RECORDS_SIZE = 50 + PARTIAL_RECORDS_BYTE_SIZE = 50000 AVAILABLE_INCREMENTAL_EXPORT = %w(tickets users organizations ticket_events).freeze UNAVAILABLE_INCREMENTAL_EXPORT = %w(ticket_fields ticket_forms ticket_metrics).freeze AVAILABLE_TARGETS = AVAILABLE_INCREMENTAL_EXPORT + UNAVAILABLE_INCREMENTAL_EXPORT def initialize(config) @@ -50,23 +52,20 @@ end # they have both Incremental API and non-incremental API %w(tickets users organizations).each do |target| define_method(target) do |partial = true, start_time = 0, &block| - if partial - export("/api/v2/#{target}.json", target, PARTIAL_RECORDS_SIZE, &block) # Ignore start_time - else - incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], &block) - end + # Always use incremental_export. There is some difference between incremental_export and export. + incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], partial, &block) end end # they have incremental API only %w(ticket_events).each do |target| define_method(target) do |partial = true, start_time = 0, &block| path = "/api/v2/incremental/#{target}" - incremental_export(path, target, start_time, [], &block) + incremental_export(path, target, start_time, [], partial, &block) end end # they have non-incremental API only UNAVAILABLE_INCREMENTAL_EXPORT.each do |target| @@ -107,36 +106,40 @@ end nil # this is necessary different with incremental_export end - def incremental_export(path, key, start_time = 0, known_ids = [], &block) - # for `embulk run` to fetch all records. - response = request(path, start_time: start_time) - - begin - data = JSON.parse(response.body) - rescue => e - raise Embulk::DataError.new(e) + def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, &block) + if partial + records = request_partial(path, {start_time: start_time}).first(5) + else + response = request(path, {start_time: start_time}) + begin + data = JSON.parse(response.body) + rescue => e + raise Embulk::DataError.new(e) + end + Embulk.logger.debug "start_time:#{start_time} (#{Time.at(start_time)}) count:#{data["count"]} next_page:#{data["next_page"]} end_time:#{data["end_time"]} " + records = data[key] end - Embulk.logger.debug "start_time:#{start_time} (#{Time.at(start_time)}) count:#{data["count"]} next_page:#{data["next_page"]} end_time:#{data["end_time"]} " - data[key].each do |record| + records.each do |record| # de-duplicated records. # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes # https://github.com/zendesk/zendesk_api_client_rb/issues/251 next if known_ids.include?(record["id"]) known_ids << record["id"] block.call record end + return if partial # NOTE: If count is less than 1000, then stop paginating. # Otherwise, use the next_page URL to get the next page of results. # https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination if data["count"] == 1000 - incremental_export(path, key, data["end_time"], known_ids, &block) + incremental_export(path, key, data["end_time"], known_ids, partial, &block) else data end end @@ -173,43 +176,102 @@ retryer.with_retry do Embulk.logger.debug "Fetching #{u.to_s}" response = httpclient.get(u.to_s, query, follow_redirect: true) - # https://developer.zendesk.com/rest_api/docs/core/introduction#response-format - status_code = response.status - case status_code - when 200, 404 - # 404 would be returned e.g. ticket comments are empty (on fetch_subresource method) - response - when 400, 401 - raise Embulk::ConfigError.new("[#{status_code}] #{response.body}") - when 409 - raise "[#{status_code}] temporally failure." - when 429 - # rate limit - retry_after = response.headers["Retry-After"] - wait_rate_limit(retry_after.to_i) - when 500, 503 - # 503 is possible rate limit - retry_after = response.headers["Retry-After"] - if retry_after - wait_rate_limit(retry_after.to_i) - else - raise "[#{status_code}] temporally failure." + handle_response(response.status, response.headers, response.body) + response + end + end + + def request_partial(path, query = {}) + # NOTE: This is a dirty hack for quick response using incremental_export API. + # Disconnect socket when received PARTIAL_RECORDS_BYTE_SIZE bytes, + # And extract valid JSONs from received bytes (extract_valid_json_from_chunk method) + u = URI.parse(config[:login_url]) + u.path = path + + retryer.with_retry do + Embulk.logger.debug "Fetching #{u.to_s}" + buf = "" + auth_retry = 0 + httpclient.get(u.to_s, query, follow_redirect: true) do |message, chunk| + if message.status == 401 + # First request will fail by 401 because not included credentials. + # HTTPClient will retry request with credentials. + if auth_retry.zero? + auth_retry += 1 + next + end end - else - raise "Server returns unknown status code (#{status_code})" + handle_response(message.status, message.headers, chunk) + + buf << chunk + break if buf.bytesize > PARTIAL_RECORDS_BYTE_SIZE end + extract_valid_json_from_chunk(buf).map do |json| + JSON.parse(json) + end end end + def extract_valid_json_from_chunk(chunk) + # Drip JSON objects from incomplete string + # + # e.g.: + # chunk = '{"ticket_events":[{"foo":1},{"foo":2},{"fo' + # extract_valid_json_from_chunk(chunk) #=> ['{"foo":1}', '{"foo":2}'] + result = [] + + # omit '{"tickets":[' prefix. See test/fixtures/tickets.json for actual response. + s = StringScanner.new(chunk.scrub.gsub(%r!^{".*?":\[!,"")) + while !s.eos? + opener = s.scan(/{/) + break unless opener + buf = opener # Initialize `buf` as "{" + while content = s.scan(/.*?}/) # grab data from start to next "}" + buf << content + if (JSON.parse(buf) rescue false) # if JSON.parse success, `buf` is valid JSON. we'll take it. + result << buf.dup + break + end + end + s.scan(/[^{]*/) # skip until next "{". `chunk` has comma separeted objects like '},{'. skip that comma. + end + result + end + def wait_rate_limit(retry_after) Embulk.logger.warn "Rate Limited. Waiting #{retry_after} seconds to retry" sleep retry_after throw :retry end + def handle_response(status_code, headers, body) + # https://developer.zendesk.com/rest_api/docs/core/introduction#response-format + case status_code + when 200, 404 + # 404 would be returned e.g. ticket comments are empty (on fetch_subresource method) + when 400, 401 + raise Embulk::ConfigError.new("[#{status_code}] #{body}") + when 409 + raise "[#{status_code}] temporally failure." + when 429 + # rate limit + retry_after = headers["Retry-After"] + wait_rate_limit(retry_after.to_i) + when 500, 503 + # 503 is possible rate limit + retry_after = headers["Retry-After"] + if retry_after + wait_rate_limit(retry_after.to_i) + else + raise "[#{status_code}] temporally failure." + end + else + raise "Server returns unknown status code (#{status_code})" + end + end end end end end