lib/embulk/input/zendesk/plugin.rb in embulk-input-zendesk-0.1.1 vs lib/embulk/input/zendesk/plugin.rb in embulk-input-zendesk-0.1.2
- old
+ new
@@ -16,17 +16,25 @@
type = column["type"].to_sym
Column.new(nil, name, type, column["format"])
end
+ if task[:incremental] && !Client::AVAILABLE_INCREMENTAL_EXPORT.include?(task[:target])
+ Embulk.logger.warn "target: #{task[:target]} don't support incremental export API. Will be ignored start_time option"
+ end
+
resume(task, columns, 1, &control)
end
def self.resume(task, columns, count, &control)
task_reports = yield(task, columns, count)
+ report = task_reports.first
next_config_diff = {}
+ if report[:start_time]
+ next_config_diff[:start_time] = report[:start_time]
+ end
return next_config_diff
end
def self.guess(config)
task = config_to_task(config)
@@ -69,10 +77,11 @@
token: config.param("token", :string, default: nil),
access_token: config.param("access_token", :string, default: nil),
start_time: config.param("start_time", :string, default: nil),
retry_limit: config.param("retry_limit", :integer, default: 5),
retry_initial_wait_sec: config.param("retry_initial_wait_sec", :integer, default: 1),
+ incremental: config.param("incremental", :bool, default: true),
schema: config.param(:columns, :array, default: []),
}
end
def init
@@ -85,17 +94,24 @@
if !preview? && @start_time
args << @start_time.to_i
end
client = Client.new(task)
- client.public_send(method, *args) do |record|
+ last_data = client.public_send(method, *args) do |record|
values = extract_values(record)
page_builder.add(values)
end
-
page_builder.finish
task_report = {}
+ if task[:incremental] && last_data && last_data["end_time"]
+ # NOTE: start_time compared as "=>", not ">".
+ # If we will use end_time for next start_time, we got the same record that is last fetched
+ # end_time + 1 is workaround for that
+ next_start_time = Time.at(last_data["end_time"] + 1)
+ task_report[:start_time] = next_start_time.strftime("%Y-%m-%d %H:%M:%S%z")
+ end
+
return task_report
end
private