lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.2.2 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.2.3
- old
+ new
@@ -39,39 +39,51 @@
end
columns
end
+ def init
+ @last_updated_at = task[:last_updated_at]
+ @columns = task[:columns]
+ @ranges = task[:ranges][index]
+ @soap = MarketoApi.soap_client(task, target)
+ end
+
def run
- count = 0
from_datetime = task[:from_datetime]
- to_datetime = task[:to_datetime]
+ to_datetime = task[:to_datetime] || Time.now
+
options = {}
options[:batch_size] = PREVIEW_COUNT if preview?
- soap.each(from_datetime, to_datetime, options) do |lead|
- values = @columns.map do |column|
- name = column["name"].to_s
- value = (lead[name] || {})[:value]
- next unless value
+ @ranges.each do |range|
+ soap.each(range, options) do |lead|
+ values = @columns.map do |column|
+ name = column["name"].to_s
+ value = (lead[name] || {})[:value]
+ next unless value
- case column["type"]
- when "timestamp"
- Time.parse(value)
- else
- value
+ case column["type"]
+ when "timestamp"
+ begin
+ Time.parse(value)
+ rescue => e
+ raise ConfigError, "Can't parse as Time '#{value}' (column is #{column["name"]})"
+ end
+ else
+ value
+ end
end
- end
- page_builder.add(values)
-
- count += 1
- break if preview? && count >= PREVIEW_COUNT
+ page_builder.add(values)
+ end
end
page_builder.finish
- commit_report = {from_datetime: to_datetime}
+ commit_report = {
+ from_datetime: to_datetime
+ }
return commit_report
end
end
end
end