lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.10 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.11

- old
+ new

@@ -128,11 +128,11 @@ private def export_parallel(path, key, start_time = 0, &block) per_page = 100 # 100 is maximum https://developer.zendesk.com/rest_api/docs/core/introduction#pagination - first_response = request(path, per_page: per_page, page: 1) + first_response = request(path, false, per_page: per_page, page: 1) first_fetched = JSON.parse(first_response.body) total_count = first_fetched["count"] last_page_num = (total_count / per_page.to_f).ceil Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}" @@ -141,11 +141,11 @@ end execute_thread_pool do |pool| (2..last_page_num).each do |page| pool.post do - response = request(path, per_page: per_page, page: page) + response = request(path, false, per_page: per_page, page: page) fetched_records = extract_records_from_response(response, key) Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}" fetched_records.uniq { |r| r['id'] }.each do |record| block.call record end @@ -158,11 +158,11 @@ def export(path, key, page = 1, &block) per_page = PARTIAL_RECORDS_SIZE Embulk.logger.info("Fetching #{path} with page=#{page} (partial)") - response = request(path, per_page: per_page, page: page) + response = request(path, true, per_page: per_page, page: page) begin data = JSON.parse(response.body) raise "Invalid data format: #{key} must be array" unless data.key?(key) && data[key].is_a?(Array) rescue => e @@ -184,11 +184,11 @@ end execute_thread_pool do |pool| loop do start_fetching = Time.now - response = request(path, query.merge(start_time: start_time)) + response = request(path, false, query.merge(start_time: start_time)) actual_fetched = 0 data = JSON.parse(response.body) # no key found in response occasionally => retry raise TempError, "No '#{key}' found in JSON response" unless data.key? key data[key].each do |record| @@ -259,11 +259,11 @@ } end httpclient end - def request(path, query = {}) + def request(path, partial = false, query = {}) u = URI.parse(config[:login_url]) u.path = path # https://help.zendesk.com/hc/en-us/articles/115010249348-Announcing-Updated-Apps-Marketplace-API-Header-Requirementsmerg extheader = {} @@ -275,12 +275,11 @@ end retryer.with_retry do Embulk.logger.debug "Fetching #{u.to_s}" response = httpclient.get(u.to_s, query, extheader) - - handle_response(response.status, response.headers, response.body) + handle_response(response.status, response.headers, response.body, partial) response end end def request_partial(path, query = {}) @@ -310,12 +309,11 @@ if auth_retry.zero? auth_retry += 1 next end end - handle_response(message.status, message.headers, chunk) - + handle_response(message.status, message.headers, chunk, true) buf << chunk break if buf.bytesize > PARTIAL_RECORDS_BYTE_SIZE end extract_valid_json_from_chunk(buf).map do |json| JSON.parse(json) @@ -347,17 +345,22 @@ s.scan(/[^{]*/) # skip until next "{". `chunk` has comma separeted objects like '},{'. skip that comma. end result end - def wait_rate_limit(retry_after) - Embulk.logger.warn "Rate Limited. Waiting #{retry_after} seconds to retry" - sleep retry_after - throw :retry + def wait_rate_limit(retry_after, partial = false) + # Won't retry for preview/guess mode + if partial + raise Embulk::DataError.new("Rate Limited. Waiting #{retry_after} seconds to re-run") + else + Embulk.logger.warn "Rate Limited. Waiting #{retry_after} seconds to retry" + sleep retry_after + throw :retry + end end - def handle_response(status_code, headers, body) + def handle_response(status_code, headers, body, partial = false) # https://developer.zendesk.com/rest_api/docs/core/introduction#response-format case status_code when 200, 404 # 404 would be returned e.g. ticket comments are empty (on fetch_subresource method) when 409 @@ -377,18 +380,18 @@ # 422 and it isn't "Too recent start_time" raise Embulk::ConfigError.new("[#{status_code}] #{body}") when 429 # rate limit retry_after = headers["Retry-After"] - wait_rate_limit(retry_after.to_i) + wait_rate_limit(retry_after.to_i, partial) when 400..500 # Won't retry for 4xx range errors except above. Almost they should be ConfigError e.g. 403 Forbidden raise Embulk::ConfigError.new("[#{status_code}] #{body}") when 500, 503 # 503 is possible rate limit retry_after = headers["Retry-After"] if retry_after - wait_rate_limit(retry_after.to_i) + wait_rate_limit(retry_after.to_i, partial) else raise "[#{status_code}] temporally failure." end else raise "Server returns unknown status code (#{status_code}) #{body}"