lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.4.0 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.5.0

- old
+ new

@@ -92,42 +92,44 @@ def init @columns = task[:columns] @ranges = task[:ranges][index] @soap = MarketoApi.soap_client(task, target) @append_processed_time_column = task[:append_processed_time_column] - end - - def run - options = { + @options = { retry_initial_wait_sec: task[:retry_initial_wait_sec], retry_limit: task[:retry_limit], } - options[:batch_size] = PREVIEW_COUNT if preview? + @options[:batch_size] = PREVIEW_COUNT if preview? + end + def run counter = 0 catch(:finish) do @ranges.each do |range| - soap.each(range, options) do |lead| - values = @columns.map do |column| - name = column["name"].to_s - value = (lead[name] || {})[:value] - cast_value(column, value) - end - - if @append_processed_time_column - values << Time.parse(range["from"]) - end - - page_builder.add(values) + soap.each(range, @options) do |lead| + page_builder.add(format_record(lead, range)) throw(:finish) if preview? && (counter += 1) >= PREVIEW_COUNT end end end page_builder.finish task_report = {} return task_report + end + + def format_record(lead, range) + values = @columns.map do |column| + name = column["name"].to_s + value = (lead[name] || {})[:value] + cast_value(column, value) + end + + if @append_processed_time_column + values << Time.parse(range["from"]) + end + values end end end end end