lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.3.2 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.4.0
- old
+ new
@@ -24,34 +24,42 @@
end
def self.resume(task, columns, count, &control)
task_reports = yield(task, columns, count)
- # When no task ran, task_reports is empty
- return {} if task_reports.empty?
- # all task returns same report as {from_datetime: to_datetime}
- return task_reports.first
+ return {}
end
def self.transaction(config, &control)
endpoint_url = config.param(:endpoint, :string)
range = format_range(config)
-
ranges = timeslice(range[:from], range[:to], TIMESLICE_COUNT_PER_TASK)
+
+ append_processed_time_column = config.param(:append_processed_time_column, :bool, default: true)
+
task = {
endpoint_url: endpoint_url,
wsdl_url: config.param(:wsdl, :string, default: "#{endpoint_url}?WSDL"),
user_id: config.param(:user_id, :string),
encryption_key: config.param(:encryption_key, :string),
from_datetime: range[:from],
to_datetime: range[:to],
ranges: ranges,
- columns: config.param(:columns, :array)
+ retry_initial_wait_sec: config.param(:retry_initial_wait_sec, :integer, default: 1),
+ retry_limit: config.param(:retry_limit, :integer, default: 5),
+ append_processed_time_column: append_processed_time_column,
+ columns: config.param(:columns, :array),
}
- resume(task, embulk_columns(config), ranges.size, &control)
+ columns = embulk_columns(config)
+ if append_processed_time_column
+ processed_time_column = Column.new(nil, :processed_time, :timestamp, "%Y-%m-%dT%H:%M:%S%z")
+ columns << processed_time_column
+ end
+
+ resume(task, columns, ranges.size, &control)
end
def self.generate_columns(metadata)
columns = [
{name: "id", type: "long"},
@@ -83,34 +91,42 @@
def init
@columns = task[:columns]
@ranges = task[:ranges][index]
@soap = MarketoApi.soap_client(task, target)
+ @append_processed_time_column = task[:append_processed_time_column]
end
def run
- options = {}
+ options = {
+ retry_initial_wait_sec: task[:retry_initial_wait_sec],
+ retry_limit: task[:retry_limit],
+ }
options[:batch_size] = PREVIEW_COUNT if preview?
counter = 0
- @ranges.each do |range|
- soap.each(range, options) do |lead|
- values = @columns.map do |column|
- name = column["name"].to_s
- value = (lead[name] || {})[:value]
- cast_value(column, value)
- end
+ catch(:finish) do
+ @ranges.each do |range|
+ soap.each(range, options) do |lead|
+ values = @columns.map do |column|
+ name = column["name"].to_s
+ value = (lead[name] || {})[:value]
+ cast_value(column, value)
+ end
- page_builder.add(values)
- break if preview? && (counter += 1) >= PREVIEW_COUNT
+ if @append_processed_time_column
+ values << Time.parse(range["from"])
+ end
+
+ page_builder.add(values)
+ throw(:finish) if preview? && (counter += 1) >= PREVIEW_COUNT
+ end
end
end
page_builder.finish
- task_report = {
- from_datetime: task[:to_datetime]
- }
+ task_report = {}
return task_report
end
end
end
end