lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.1.3 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.1.4
- old
+ new
@@ -1,14 +1,16 @@
+require "strscan"
require "httpclient"
module Embulk
module Input
module Zendesk
class Client
attr_reader :config
PARTIAL_RECORDS_SIZE = 50
+ PARTIAL_RECORDS_BYTE_SIZE = 50000
AVAILABLE_INCREMENTAL_EXPORT = %w(tickets users organizations ticket_events).freeze
UNAVAILABLE_INCREMENTAL_EXPORT = %w(ticket_fields ticket_forms ticket_metrics).freeze
AVAILABLE_TARGETS = AVAILABLE_INCREMENTAL_EXPORT + UNAVAILABLE_INCREMENTAL_EXPORT
def initialize(config)
@@ -50,23 +52,20 @@
end
# they have both Incremental API and non-incremental API
%w(tickets users organizations).each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
- if partial
- export("/api/v2/#{target}.json", target, PARTIAL_RECORDS_SIZE, &block) # Ignore start_time
- else
- incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], &block)
- end
+ # Always use incremental_export. There is some difference between incremental_export and export.
+ incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], partial, &block)
end
end
# they have incremental API only
%w(ticket_events).each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
path = "/api/v2/incremental/#{target}"
- incremental_export(path, target, start_time, [], &block)
+ incremental_export(path, target, start_time, [], partial, &block)
end
end
# they have non-incremental API only
UNAVAILABLE_INCREMENTAL_EXPORT.each do |target|
@@ -107,36 +106,40 @@
end
nil # this is necessary different with incremental_export
end
- def incremental_export(path, key, start_time = 0, known_ids = [], &block)
- # for `embulk run` to fetch all records.
- response = request(path, start_time: start_time)
-
- begin
- data = JSON.parse(response.body)
- rescue => e
- raise Embulk::DataError.new(e)
+ def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, &block)
+ if partial
+ records = request_partial(path, {start_time: start_time}).first(5)
+ else
+ response = request(path, {start_time: start_time})
+ begin
+ data = JSON.parse(response.body)
+ rescue => e
+ raise Embulk::DataError.new(e)
+ end
+ Embulk.logger.debug "start_time:#{start_time} (#{Time.at(start_time)}) count:#{data["count"]} next_page:#{data["next_page"]} end_time:#{data["end_time"]} "
+ records = data[key]
end
- Embulk.logger.debug "start_time:#{start_time} (#{Time.at(start_time)}) count:#{data["count"]} next_page:#{data["next_page"]} end_time:#{data["end_time"]} "
- data[key].each do |record|
+ records.each do |record|
# de-duplicated records.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
# https://github.com/zendesk/zendesk_api_client_rb/issues/251
next if known_ids.include?(record["id"])
known_ids << record["id"]
block.call record
end
+ return if partial
# NOTE: If count is less than 1000, then stop paginating.
# Otherwise, use the next_page URL to get the next page of results.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination
if data["count"] == 1000
- incremental_export(path, key, data["end_time"], known_ids, &block)
+ incremental_export(path, key, data["end_time"], known_ids, partial, &block)
else
data
end
end
@@ -173,43 +176,102 @@
retryer.with_retry do
Embulk.logger.debug "Fetching #{u.to_s}"
response = httpclient.get(u.to_s, query, follow_redirect: true)
- # https://developer.zendesk.com/rest_api/docs/core/introduction#response-format
- status_code = response.status
- case status_code
- when 200, 404
- # 404 would be returned e.g. ticket comments are empty (on fetch_subresource method)
- response
- when 400, 401
- raise Embulk::ConfigError.new("[#{status_code}] #{response.body}")
- when 409
- raise "[#{status_code}] temporally failure."
- when 429
- # rate limit
- retry_after = response.headers["Retry-After"]
- wait_rate_limit(retry_after.to_i)
- when 500, 503
- # 503 is possible rate limit
- retry_after = response.headers["Retry-After"]
- if retry_after
- wait_rate_limit(retry_after.to_i)
- else
- raise "[#{status_code}] temporally failure."
+ handle_response(response.status, response.headers, response.body)
+ response
+ end
+ end
+
+ def request_partial(path, query = {})
+ # NOTE: This is a dirty hack for quick response using incremental_export API.
+ # Disconnect socket when received PARTIAL_RECORDS_BYTE_SIZE bytes,
+ # And extract valid JSONs from received bytes (extract_valid_json_from_chunk method)
+ u = URI.parse(config[:login_url])
+ u.path = path
+
+ retryer.with_retry do
+ Embulk.logger.debug "Fetching #{u.to_s}"
+ buf = ""
+ auth_retry = 0
+ httpclient.get(u.to_s, query, follow_redirect: true) do |message, chunk|
+ if message.status == 401
+ # First request will fail by 401 because not included credentials.
+ # HTTPClient will retry request with credentials.
+ if auth_retry.zero?
+ auth_retry += 1
+ next
+ end
end
- else
- raise "Server returns unknown status code (#{status_code})"
+ handle_response(message.status, message.headers, chunk)
+
+ buf << chunk
+ break if buf.bytesize > PARTIAL_RECORDS_BYTE_SIZE
end
+ extract_valid_json_from_chunk(buf).map do |json|
+ JSON.parse(json)
+ end
end
end
+ def extract_valid_json_from_chunk(chunk)
+ # Drip JSON objects from incomplete string
+ #
+ # e.g.:
+ # chunk = '{"ticket_events":[{"foo":1},{"foo":2},{"fo'
+ # extract_valid_json_from_chunk(chunk) #=> ['{"foo":1}', '{"foo":2}']
+ result = []
+
+ # omit '{"tickets":[' prefix. See test/fixtures/tickets.json for actual response.
+ s = StringScanner.new(chunk.scrub.gsub(%r!^{".*?":\[!,""))
+ while !s.eos?
+ opener = s.scan(/{/)
+ break unless opener
+ buf = opener # Initialize `buf` as "{"
+ while content = s.scan(/.*?}/) # grab data from start to next "}"
+ buf << content
+ if (JSON.parse(buf) rescue false) # if JSON.parse success, `buf` is valid JSON. we'll take it.
+ result << buf.dup
+ break
+ end
+ end
+ s.scan(/[^{]*/) # skip until next "{". `chunk` has comma separeted objects like '},{'. skip that comma.
+ end
+ result
+ end
+
def wait_rate_limit(retry_after)
Embulk.logger.warn "Rate Limited. Waiting #{retry_after} seconds to retry"
sleep retry_after
throw :retry
end
+ def handle_response(status_code, headers, body)
+ # https://developer.zendesk.com/rest_api/docs/core/introduction#response-format
+ case status_code
+ when 200, 404
+ # 404 would be returned e.g. ticket comments are empty (on fetch_subresource method)
+ when 400, 401
+ raise Embulk::ConfigError.new("[#{status_code}] #{body}")
+ when 409
+ raise "[#{status_code}] temporally failure."
+ when 429
+ # rate limit
+ retry_after = headers["Retry-After"]
+ wait_rate_limit(retry_after.to_i)
+ when 500, 503
+ # 503 is possible rate limit
+ retry_after = headers["Retry-After"]
+ if retry_after
+ wait_rate_limit(retry_after.to_i)
+ else
+ raise "[#{status_code}] temporally failure."
+ end
+ else
+ raise "Server returns unknown status code (#{status_code})"
+ end
+ end
end
end
end
end