lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.2.2 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.2.3

- old
+ new

@@ -39,39 +39,51 @@ end columns end + def init + @last_updated_at = task[:last_updated_at] + @columns = task[:columns] + @ranges = task[:ranges][index] + @soap = MarketoApi.soap_client(task, target) + end + def run - count = 0 from_datetime = task[:from_datetime] - to_datetime = task[:to_datetime] + to_datetime = task[:to_datetime] || Time.now + options = {} options[:batch_size] = PREVIEW_COUNT if preview? - soap.each(from_datetime, to_datetime, options) do |lead| - values = @columns.map do |column| - name = column["name"].to_s - value = (lead[name] || {})[:value] - next unless value + @ranges.each do |range| + soap.each(range, options) do |lead| + values = @columns.map do |column| + name = column["name"].to_s + value = (lead[name] || {})[:value] + next unless value - case column["type"] - when "timestamp" - Time.parse(value) - else - value + case column["type"] + when "timestamp" + begin + Time.parse(value) + rescue => e + raise ConfigError, "Can't parse as Time '#{value}' (column is #{column["name"]})" + end + else + value + end end - end - page_builder.add(values) - - count += 1 - break if preview? && count >= PREVIEW_COUNT + page_builder.add(values) + end end page_builder.finish - commit_report = {from_datetime: to_datetime} + commit_report = { + from_datetime: to_datetime + } return commit_report end end end end