lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.12 vs lib/embulk/input/zendesk/client.rb in embulk-input-zendesk-0.2.13

- old
+ new

@@ -80,27 +80,27 @@ end # they have both Incremental API and non-incremental API # 170717: `ticket_events` can use standard endpoint format now, ie. `<target>.json` %w(tickets ticket_events users organizations).each do |target| - define_method(target) do |partial = true, start_time = 0, &block| + define_method(target) do |partial = true, start_time = 0, dedup = true, &block| # Always use incremental_export. There is some difference between incremental_export and export. - incremental_export("/api/v2/incremental/#{target}.json", target, start_time, Set.new, partial, &block) + incremental_export("/api/v2/incremental/#{target}.json", target, start_time, dedup, Set.new, partial, &block) end end # Ticket metrics will need to be export using both the non incremental and incremental on ticket # We provide support by filter out ticket_metrics with created at smaller than start time # while passing the incremental start time to the incremental ticket/ticket_metrics export - define_method('ticket_metrics') do |partial = true, start_time = 0, &block| + define_method('ticket_metrics') do |partial = true, start_time = 0, dedup = true, &block| if partial # If partial export then we need to use the old end point. Since new end point return both ticket and # ticket metric with ticket come first so the current approach that cut off the response packet won't work # Since partial is only use for preview and guess so this should be fine export('/api/v2/ticket_metrics.json', 'ticket_metrics', &block) else - incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, Set.new, partial, { include: 'metric_sets' }, &block) + incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, dedup, Set.new, partial, { include: 'metric_sets' }, &block) end end # they have non-incremental API only UNAVAILABLE_INCREMENTAL_EXPORT.each do |target| @@ -173,19 +173,23 @@ data[key].each do |record| block.call record end end - def incremental_export(path, key, start_time = 0, known_ids = Set.new, partial = true, query = {}, &block) + def incremental_export(path, key, start_time = 0, dedup = true, known_ids = Set.new, partial = true, query = {}, &block) if partial records = request_partial(path, query.merge(start_time: start_time)).first(5) records.uniq{|r| r["id"]}.each do |record| block.call record end return end + if !dedup + Embulk.logger.warn("!!! You've selected to skip de-duplicating records, result may contain duplicated data !!!") + end + execute_thread_pool do |pool| loop do start_fetching = Time.now response = request(path, false, query.merge(start_time: start_time)) actual_fetched = 0 @@ -206,12 +210,14 @@ end # de-duplicated records. # https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes # https://github.com/zendesk/zendesk_api_client_rb/issues/251 - next if known_ids.include?(record["id"]) + if dedup + next if known_ids.include?(record["id"]) + known_ids << record["id"] + end - known_ids << record["id"] pool.post { block.call record } actual_fetched += 1 end Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds" start_time = data["end_time"]