lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.3.0 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.3.1
- old
+ new
@@ -2,18 +2,58 @@
module Embulk
module Input
module Marketo
class Lead < Base
- include Timeslice
+ TIMESLICE_COUNT_PER_TASK = 24
Plugin.register_input("marketo/lead", self)
def self.target
:lead
end
+ def self.guess(config)
+ if config.param(:last_updated_at, :string, default: nil)
+ Embulk.logger.warn "config: last_updated_at is deprecated. Use from_datetime/to_datetime"
+ end
+
+ client = soap_client(config)
+ metadata = client.metadata
+
+ return {"columns" => generate_columns(metadata)}
+ end
+
+ def self.resume(task, columns, count, &control)
+ task_reports = yield(task, columns, count)
+
+ # When no task ran, task_reports is empty
+ return {} if task_reports.empty?
+ # all task returns same report as {from_datetime: to_datetime}
+ return task_reports.first
+ end
+
+ def self.transaction(config, &control)
+ endpoint_url = config.param(:endpoint, :string)
+
+ range = format_range(config)
+
+ ranges = timeslice(range[:from], range[:to], TIMESLICE_COUNT_PER_TASK)
+ task = {
+ endpoint_url: endpoint_url,
+ wsdl_url: config.param(:wsdl, :string, default: "#{endpoint_url}?WSDL"),
+ user_id: config.param(:user_id, :string),
+ encryption_key: config.param(:encryption_key, :string),
+ from_datetime: range[:from],
+ to_datetime: range[:to],
+ ranges: ranges,
+ columns: config.param(:columns, :array)
+ }
+
+ resume(task, embulk_columns(config), ranges.size, &control)
+ end
+
def self.generate_columns(metadata)
columns = [
{name: "id", type: "long"},
{name: "email", type: "string"},
]
@@ -46,44 +86,32 @@
@ranges = task[:ranges][index]
@soap = MarketoApi.soap_client(task, target)
end
def run
- from_datetime = task[:from_datetime]
- to_datetime = task[:to_datetime] || Time.now
-
options = {}
options[:batch_size] = PREVIEW_COUNT if preview?
+ counter = 0
@ranges.each do |range|
soap.each(range, options) do |lead|
values = @columns.map do |column|
name = column["name"].to_s
value = (lead[name] || {})[:value]
- next unless value
-
- case column["type"]
- when "timestamp"
- begin
- Time.parse(value)
- rescue => e
- raise ConfigError, "Can't parse as Time '#{value}' (column is #{column["name"]})"
- end
- else
- value
- end
+ cast_value(column, value)
end
page_builder.add(values)
+ break if preview? && (counter += 1) >= PREVIEW_COUNT
end
end
page_builder.finish
- commit_report = {
- from_datetime: to_datetime
+ task_report = {
+ from_datetime: task[:to_datetime]
}
- return commit_report
+ return task_report
end
end
end
end
end