lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.3.0 vs lib/embulk/input/marketo/lead.rb in embulk-input-marketo-0.3.1

- old
+ new

@@ -2,18 +2,58 @@ module Embulk module Input module Marketo class Lead < Base - include Timeslice + TIMESLICE_COUNT_PER_TASK = 24 Plugin.register_input("marketo/lead", self) def self.target :lead end + def self.guess(config) + if config.param(:last_updated_at, :string, default: nil) + Embulk.logger.warn "config: last_updated_at is deprecated. Use from_datetime/to_datetime" + end + + client = soap_client(config) + metadata = client.metadata + + return {"columns" => generate_columns(metadata)} + end + + def self.resume(task, columns, count, &control) + task_reports = yield(task, columns, count) + + # When no task ran, task_reports is empty + return {} if task_reports.empty? + # all task returns same report as {from_datetime: to_datetime} + return task_reports.first + end + + def self.transaction(config, &control) + endpoint_url = config.param(:endpoint, :string) + + range = format_range(config) + + ranges = timeslice(range[:from], range[:to], TIMESLICE_COUNT_PER_TASK) + task = { + endpoint_url: endpoint_url, + wsdl_url: config.param(:wsdl, :string, default: "#{endpoint_url}?WSDL"), + user_id: config.param(:user_id, :string), + encryption_key: config.param(:encryption_key, :string), + from_datetime: range[:from], + to_datetime: range[:to], + ranges: ranges, + columns: config.param(:columns, :array) + } + + resume(task, embulk_columns(config), ranges.size, &control) + end + def self.generate_columns(metadata) columns = [ {name: "id", type: "long"}, {name: "email", type: "string"}, ] @@ -46,44 +86,32 @@ @ranges = task[:ranges][index] @soap = MarketoApi.soap_client(task, target) end def run - from_datetime = task[:from_datetime] - to_datetime = task[:to_datetime] || Time.now - options = {} options[:batch_size] = PREVIEW_COUNT if preview? + counter = 0 @ranges.each do |range| soap.each(range, options) do |lead| values = @columns.map do |column| name = column["name"].to_s value = (lead[name] || {})[:value] - next unless value - - case column["type"] - when "timestamp" - begin - Time.parse(value) - rescue => e - raise ConfigError, "Can't parse as Time '#{value}' (column is #{column["name"]})" - end - else - value - end + cast_value(column, value) end page_builder.add(values) + break if preview? && (counter += 1) >= PREVIEW_COUNT end end page_builder.finish - commit_report = { - from_datetime: to_datetime + task_report = { + from_datetime: task[:to_datetime] } - return commit_report + return task_report end end end end end