lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.6.0 vs lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.7.0

- old
+ new

@@ -15,19 +15,18 @@ task = { "secret_key_id" => config.param("secret_key_id", :string), "secret_key" => config.param("secret_key", :string), "endpoint" => config.param("endpoint", :string, default: 'https://api.lkqd.com/reports'), "copy_temp_to" => config.param("copy_temp_to", :string, default: nil), - "try_convert" => config.param("try_convert", :bool, default: true), "measurable_impressions" => config.param("measurable_impressions", :bool, default: false), "viewable_impressions" => config.param("viewable_impressions", :bool, default: false), "report_parameters" => config.param("report_parameters", :hash, default: {}), } task['authorization'] = Base64.urlsafe_encode64("#{task['secret_key_id']}:#{task['secret_key']}") response = request_lkqd({authorization: task['authorization'], endpoint: task['endpoint'], report_parameters: task['report_parameters']}) - tempfile = Tempfile.new('emublk-input-lkqd_') + tempfile = Tempfile.new('embulk-input-lkqd_') while chunk = response.body.readpartial tempfile.write chunk end tempfile.close task['tempfile_path'] = tempfile.path @@ -76,11 +75,11 @@ def self.guess_column_type(metrics, column_name) column_name.gsub!(/^\W/, '') column_option = DEFAULT_COLUMNS[column_name] if column_option - return {type: column_option['type'].to_sym, name: column_name, format: column_option['format']} + return {type: column_option['type'].to_sym, name: column_name} else return {type: :string, name: column_name} end end @@ -89,12 +88,12 @@ name, value = field column_name = name.gsub(/^\W/, '') column_option = DEFAULT_COLUMNS[column_name] if column_option.nil? next value - elsif column_option['type'] == 'timestamp' - next Time.strptime(value + " " + options[:timezone], column_option['format'] + " %Z").to_i + #elsif column_option['type'] == 'timestamp' + # next Time.strptime(value + " " + options[:timezone], column_option['format'] + " %Z").to_i elsif column_option['type'] == 'long' next value.gsub(',','').to_i elsif column_option['type'] == 'double' && value.match(/%$/) next value.gsub(',','').to_f / 100.0 elsif column_option['type'] == 'double' @@ -104,32 +103,30 @@ end end end def self.measurable_impressions?(task) - task['try_convert'] && task['measurable_impressions'] && - task['viewability_measured_rate_index'] && task['impressions_index'] + task['measurable_impressions'] && task['viewability_measured_rate_index'] && task['impressions_index'] end def self.viewable_impressions?(task) - task['try_convert'] && task['viewable_impressions'] && - task['viewability_rate_index'] && task['viewability_measured_rate_index'] && task['impressions_index'] + task['viewable_impressions'] && task['viewability_rate_index'] && + task['viewability_measured_rate_index'] && task['impressions_index'] end def init @task = task end def run convert_options = {timezone: @task['report_parameters']['timezone']} - try_convert = @task['try_convert'] viewability_measured_rate_index = @task['viewability_measured_rate_index'] viewability_rate_index = @task['viewability_rate_index'] impressions_index = @task['impressions_index'] CSV.foreach(@task['tempfile_path'], {headers: true}).each do |row| - row = try_convert ? Lkqd.try_convert(row, convert_options) : row + row = Lkqd.try_convert(row, convert_options) if Lkqd.measurable_impressions?(@task) row << row[impressions_index] * row[viewability_measured_rate_index] end if Lkqd.viewable_impressions?(@task) @@ -144,10 +141,10 @@ return task_report end DEFAULT_COLUMNS = { # dimensions' columns - 'Time'=> { 'type' => 'timestamp', 'format' => "%Y-%m-%dT%H" }, + 'Time'=> { 'type' => 'string' }, 'Account'=> { 'type' => 'string' }, 'Supply Source ID'=> { 'type' => 'string' }, 'Supply Source'=> { 'type' => 'string' }, 'Supply Partner'=> { 'type' => 'string' }, 'Environment'=> { 'type' => 'string' },