lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.3.2 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.4.0

- old
+ new

@@ -24,34 +24,42 @@ end def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) - # When no task ran, task_reports is empty - return {} if task_reports.empty? - # all task returns same report as {from_datetime: to_datetime} - return task_reports.first + return {} end def self.transaction(config, &control) endpoint_url = config.param(:endpoint, :string) range = format_range(config) - ranges = timeslice(range[:from], range[:to], TIMESLICE_COUNT_PER_TASK) + + append_processed_time_column = config.param(:append_processed_time_column, :bool, default: true) + task = { endpoint_url: endpoint_url, wsdl_url: config.param(:wsdl, :string, default: "#{endpoint_url}?WSDL"), user_id: config.param(:user_id, :string), encryption_key: config.param(:encryption_key, :string), from_datetime: range[:from], to_datetime: range[:to], ranges: ranges, - columns: config.param(:columns, :array) + retry_initial_wait_sec: config.param(:retry_initial_wait_sec, :integer, default: 1), + retry_limit: config.param(:retry_limit, :integer, default: 5), + append_processed_time_column: append_processed_time_column, + columns: config.param(:columns, :array), } - resume(task, embulk_columns(config), ranges.size, &control) + columns = embulk_columns(config) + if append_processed_time_column + processed_time_column = Column.new(nil, :processed_time, :timestamp, "%Y-%m-%dT%H:%M:%S%z") + columns << processed_time_column + end + + resume(task, columns, ranges.size, &control) end def self.generate_columns(metadata) columns = [ {name: "id", type: "long"}, @@ -83,34 +91,42 @@ def init @columns = task[:columns] @ranges = task[:ranges][index] @soap = MarketoApi.soap_client(task, target) + @append_processed_time_column = task[:append_processed_time_column] end def run - options = {} + options = { + retry_initial_wait_sec: task[:retry_initial_wait_sec], + retry_limit: task[:retry_limit], + } options[:batch_size] = PREVIEW_COUNT if preview? counter = 0 - @ranges.each do |range| - soap.each(range, options) do |lead| - values = @columns.map do |column| - name = column["name"].to_s - value = (lead[name] || {})[:value] - cast_value(column, value) - end + catch(:finish) do + @ranges.each do |range| + soap.each(range, options) do |lead| + values = @columns.map do |column| + name = column["name"].to_s + value = (lead[name] || {})[:value] + cast_value(column, value) + end - page_builder.add(values) - break if preview? && (counter += 1) >= PREVIEW_COUNT + if @append_processed_time_column + values << Time.parse(range["from"]) + end + + page_builder.add(values) + throw(:finish) if preview? && (counter += 1) >= PREVIEW_COUNT + end end end page_builder.finish - task_report = { - from_datetime: task[:to_datetime] - } + task_report = {} return task_report end end end end