lib/embulk/input/zendesk/plugin.rb in embulk-input-zendesk-0.1.1 vs lib/embulk/input/zendesk/plugin.rb in embulk-input-zendesk-0.1.2

- old
+ new

@@ -16,17 +16,25 @@ type = column["type"].to_sym Column.new(nil, name, type, column["format"]) end + if task[:incremental] && !Client::AVAILABLE_INCREMENTAL_EXPORT.include?(task[:target]) + Embulk.logger.warn "target: #{task[:target]} don't support incremental export API. Will be ignored start_time option" + end + resume(task, columns, 1, &control) end def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) + report = task_reports.first next_config_diff = {} + if report[:start_time] + next_config_diff[:start_time] = report[:start_time] + end return next_config_diff end def self.guess(config) task = config_to_task(config) @@ -69,10 +77,11 @@ token: config.param("token", :string, default: nil), access_token: config.param("access_token", :string, default: nil), start_time: config.param("start_time", :string, default: nil), retry_limit: config.param("retry_limit", :integer, default: 5), retry_initial_wait_sec: config.param("retry_initial_wait_sec", :integer, default: 1), + incremental: config.param("incremental", :bool, default: true), schema: config.param(:columns, :array, default: []), } end def init @@ -85,17 +94,24 @@ if !preview? && @start_time args << @start_time.to_i end client = Client.new(task) - client.public_send(method, *args) do |record| + last_data = client.public_send(method, *args) do |record| values = extract_values(record) page_builder.add(values) end - page_builder.finish task_report = {} + if task[:incremental] && last_data && last_data["end_time"] + # NOTE: start_time compared as "=>", not ">". + # If we will use end_time for next start_time, we got the same record that is last fetched + # end_time + 1 is workaround for that + next_start_time = Time.at(last_data["end_time"] + 1) + task_report[:start_time] = next_start_time.strftime("%Y-%m-%d %H:%M:%S%z") + end + return task_report end private