lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.10 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.11
- old
+ new
@@ -128,11 +128,11 @@
private
def export_parallel(path, key, start_time = 0, &block)
per_page = 100 # 100 is maximum https://developer.zendesk.com/rest_api/docs/core/introduction#pagination
- first_response = request(path, per_page: per_page, page: 1)
+ first_response = request(path, false, per_page: per_page, page: 1)
first_fetched = JSON.parse(first_response.body)
total_count = first_fetched["count"]
last_page_num = (total_count / per_page.to_f).ceil
Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}"
@@ -141,11 +141,11 @@
end
execute_thread_pool do |pool|
(2..last_page_num).each do |page|
pool.post do
- response = request(path, per_page: per_page, page: page)
+ response = request(path, false, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
fetched_records.uniq { |r| r['id'] }.each do |record|
block.call record
end
@@ -158,11 +158,11 @@
def export(path, key, page = 1, &block)
per_page = PARTIAL_RECORDS_SIZE
Embulk.logger.info("Fetching #{path} with page=#{page} (partial)")
- response = request(path, per_page: per_page, page: page)
+ response = request(path, true, per_page: per_page, page: page)
begin
data = JSON.parse(response.body)
raise "Invalid data format: #{key} must be array" unless data.key?(key) && data[key].is_a?(Array)
rescue => e
@@ -184,11 +184,11 @@
end
execute_thread_pool do |pool|
loop do
start_fetching = Time.now
- response = request(path, query.merge(start_time: start_time))
+ response = request(path, false, query.merge(start_time: start_time))
actual_fetched = 0
data = JSON.parse(response.body)
# no key found in response occasionally => retry
raise TempError, "No '#{key}' found in JSON response" unless data.key? key
data[key].each do |record|
@@ -259,11 +259,11 @@
}
end
httpclient
end
- def request(path, query = {})
+ def request(path, partial = false, query = {})
u = URI.parse(config[:login_url])
u.path = path
# https://help.zendesk.com/hc/en-us/articles/115010249348-Announcing-Updated-Apps-Marketplace-API-Header-Requirementsmerg
extheader = {}
@@ -275,12 +275,11 @@
end
retryer.with_retry do
Embulk.logger.debug "Fetching #{u.to_s}"
response = httpclient.get(u.to_s, query, extheader)
-
- handle_response(response.status, response.headers, response.body)
+ handle_response(response.status, response.headers, response.body, partial)
response
end
end
def request_partial(path, query = {})
@@ -310,12 +309,11 @@
if auth_retry.zero?
auth_retry += 1
next
end
end
- handle_response(message.status, message.headers, chunk)
-
+ handle_response(message.status, message.headers, chunk, true)
buf << chunk
break if buf.bytesize > PARTIAL_RECORDS_BYTE_SIZE
end
extract_valid_json_from_chunk(buf).map do |json|
JSON.parse(json)
@@ -347,17 +345,22 @@
s.scan(/[^{]*/) # skip until next "{". `chunk` has comma separeted objects like '},{'. skip that comma.
end
result
end
- def wait_rate_limit(retry_after)
- Embulk.logger.warn "Rate Limited. Waiting #{retry_after} seconds to retry"
- sleep retry_after
- throw :retry
+ def wait_rate_limit(retry_after, partial = false)
+ # Won't retry for preview/guess mode
+ if partial
+ raise Embulk::DataError.new("Rate Limited. Waiting #{retry_after} seconds to re-run")
+ else
+ Embulk.logger.warn "Rate Limited. Waiting #{retry_after} seconds to retry"
+ sleep retry_after
+ throw :retry
+ end
end
- def handle_response(status_code, headers, body)
+ def handle_response(status_code, headers, body, partial = false)
# https://developer.zendesk.com/rest_api/docs/core/introduction#response-format
case status_code
when 200, 404
# 404 would be returned e.g. ticket comments are empty (on fetch_subresource method)
when 409
@@ -377,18 +380,18 @@
# 422 and it isn't "Too recent start_time"
raise Embulk::ConfigError.new("[#{status_code}] #{body}")
when 429
# rate limit
retry_after = headers["Retry-After"]
- wait_rate_limit(retry_after.to_i)
+ wait_rate_limit(retry_after.to_i, partial)
when 400..500
# Won't retry for 4xx range errors except above. Almost they should be ConfigError e.g. 403 Forbidden
raise Embulk::ConfigError.new("[#{status_code}] #{body}")
when 500, 503
# 503 is possible rate limit
retry_after = headers["Retry-After"]
if retry_after
- wait_rate_limit(retry_after.to_i)
+ wait_rate_limit(retry_after.to_i, partial)
else
raise "[#{status_code}] temporally failure."
end
else
raise "Server returns unknown status code (#{status_code}) #{body}"