lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.6.0 vs lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.7.0
- old
+ new
@@ -15,19 +15,18 @@
task = {
"secret_key_id" => config.param("secret_key_id", :string),
"secret_key" => config.param("secret_key", :string),
"endpoint" => config.param("endpoint", :string, default: 'https://api.lkqd.com/reports'),
"copy_temp_to" => config.param("copy_temp_to", :string, default: nil),
- "try_convert" => config.param("try_convert", :bool, default: true),
"measurable_impressions" => config.param("measurable_impressions", :bool, default: false),
"viewable_impressions" => config.param("viewable_impressions", :bool, default: false),
"report_parameters" => config.param("report_parameters", :hash, default: {}),
}
task['authorization'] = Base64.urlsafe_encode64("#{task['secret_key_id']}:#{task['secret_key']}")
response = request_lkqd({authorization: task['authorization'], endpoint: task['endpoint'], report_parameters: task['report_parameters']})
- tempfile = Tempfile.new('emublk-input-lkqd_')
+ tempfile = Tempfile.new('embulk-input-lkqd_')
while chunk = response.body.readpartial
tempfile.write chunk
end
tempfile.close
task['tempfile_path'] = tempfile.path
@@ -76,11 +75,11 @@
def self.guess_column_type(metrics, column_name)
column_name.gsub!(/^\W/, '')
column_option = DEFAULT_COLUMNS[column_name]
if column_option
- return {type: column_option['type'].to_sym, name: column_name, format: column_option['format']}
+ return {type: column_option['type'].to_sym, name: column_name}
else
return {type: :string, name: column_name}
end
end
@@ -89,12 +88,12 @@
name, value = field
column_name = name.gsub(/^\W/, '')
column_option = DEFAULT_COLUMNS[column_name]
if column_option.nil?
next value
- elsif column_option['type'] == 'timestamp'
- next Time.strptime(value + " " + options[:timezone], column_option['format'] + " %Z").to_i
+ #elsif column_option['type'] == 'timestamp'
+ # next Time.strptime(value + " " + options[:timezone], column_option['format'] + " %Z").to_i
elsif column_option['type'] == 'long'
next value.gsub(',','').to_i
elsif column_option['type'] == 'double' && value.match(/%$/)
next value.gsub(',','').to_f / 100.0
elsif column_option['type'] == 'double'
@@ -104,32 +103,30 @@
end
end
end
def self.measurable_impressions?(task)
- task['try_convert'] && task['measurable_impressions'] &&
- task['viewability_measured_rate_index'] && task['impressions_index']
+ task['measurable_impressions'] && task['viewability_measured_rate_index'] && task['impressions_index']
end
def self.viewable_impressions?(task)
- task['try_convert'] && task['viewable_impressions'] &&
- task['viewability_rate_index'] && task['viewability_measured_rate_index'] && task['impressions_index']
+ task['viewable_impressions'] && task['viewability_rate_index'] &&
+ task['viewability_measured_rate_index'] && task['impressions_index']
end
def init
@task = task
end
def run
convert_options = {timezone: @task['report_parameters']['timezone']}
- try_convert = @task['try_convert']
viewability_measured_rate_index = @task['viewability_measured_rate_index']
viewability_rate_index = @task['viewability_rate_index']
impressions_index = @task['impressions_index']
CSV.foreach(@task['tempfile_path'], {headers: true}).each do |row|
- row = try_convert ? Lkqd.try_convert(row, convert_options) : row
+ row = Lkqd.try_convert(row, convert_options)
if Lkqd.measurable_impressions?(@task)
row << row[impressions_index] * row[viewability_measured_rate_index]
end
if Lkqd.viewable_impressions?(@task)
@@ -144,10 +141,10 @@
return task_report
end
DEFAULT_COLUMNS = {
# dimensions' columns
- 'Time'=> { 'type' => 'timestamp', 'format' => "%Y-%m-%dT%H" },
+ 'Time'=> { 'type' => 'string' },
'Account'=> { 'type' => 'string' },
'Supply Source ID'=> { 'type' => 'string' },
'Supply Source'=> { 'type' => 'string' },
'Supply Partner'=> { 'type' => 'string' },
'Environment'=> { 'type' => 'string' },