lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.4.0 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.5.0
- old
+ new
@@ -92,42 +92,44 @@
def init
@columns = task[:columns]
@ranges = task[:ranges][index]
@soap = MarketoApi.soap_client(task, target)
@append_processed_time_column = task[:append_processed_time_column]
- end
-
- def run
- options = {
+ @options = {
retry_initial_wait_sec: task[:retry_initial_wait_sec],
retry_limit: task[:retry_limit],
}
- options[:batch_size] = PREVIEW_COUNT if preview?
+ @options[:batch_size] = PREVIEW_COUNT if preview?
+ end
+ def run
counter = 0
catch(:finish) do
@ranges.each do |range|
- soap.each(range, options) do |lead|
- values = @columns.map do |column|
- name = column["name"].to_s
- value = (lead[name] || {})[:value]
- cast_value(column, value)
- end
-
- if @append_processed_time_column
- values << Time.parse(range["from"])
- end
-
- page_builder.add(values)
+ soap.each(range, @options) do |lead|
+ page_builder.add(format_record(lead, range))
throw(:finish) if preview? && (counter += 1) >= PREVIEW_COUNT
end
end
end
page_builder.finish
task_report = {}
return task_report
+ end
+
+ def format_record(lead, range)
+ values = @columns.map do |column|
+ name = column["name"].to_s
+ value = (lead[name] || {})[:value]
+ cast_value(column, value)
+ end
+
+ if @append_processed_time_column
+ values << Time.parse(range["from"])
+ end
+ values
end
end
end
end
end