lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.5.0 vs lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.6.0
- old
+ new
@@ -14,11 +14,14 @@
def self.transaction(config, &control)
task = {
"secret_key_id" => config.param("secret_key_id", :string),
"secret_key" => config.param("secret_key", :string),
"endpoint" => config.param("endpoint", :string, default: 'https://api.lkqd.com/reports'),
- "try_convert" => config.param("try_convert", :string, default: "true"),
+ "copy_temp_to" => config.param("copy_temp_to", :string, default: nil),
+ "try_convert" => config.param("try_convert", :bool, default: true),
+ "measurable_impressions" => config.param("measurable_impressions", :bool, default: false),
+ "viewable_impressions" => config.param("viewable_impressions", :bool, default: false),
"report_parameters" => config.param("report_parameters", :hash, default: {}),
}
task['authorization'] = Base64.urlsafe_encode64("#{task['secret_key_id']}:#{task['secret_key']}")
response = request_lkqd({authorization: task['authorization'], endpoint: task['endpoint'], report_parameters: task['report_parameters']})
@@ -27,16 +30,34 @@
tempfile.write chunk
end
tempfile.close
task['tempfile_path'] = tempfile.path
+ FileUtils.cp(task['tempfile_path'], task['copy_temp_to']) if task['copy_temp_to']
+
columns = []
::CSV.foreach(tempfile.path).first.each_with_index do |column_name, index|
column_type = guess_column_type(task['report_parameters']['metrics'], column_name)
- columns << Column.new({index: index}.merge(column_type))
+ column = Column.new({index: index}.merge(column_type))
+ if column.name == 'Viewability Measured Rate'
+ task['viewability_measured_rate_index'] = index
+ elsif column.name == 'Viewability Rate'
+ task['viewability_rate_index'] = index
+ elsif column.name == 'Impressions'
+ task['impressions_index'] = index
+ end
+ columns << column
end
+ if measurable_impressions?(task)
+ columns << Column.new({index: columns.size, type: :double, name: 'Measurable Impressions'})
+ end
+
+ if viewable_impressions?(task)
+ columns << Column.new({index: columns.size, type: :double, name: 'Viewable Impressions'})
+ end
+
resume(task, columns, 1, &control)
end
def self.resume(task, columns, count, &control)
task_reports = yield(task, columns, count)
@@ -82,18 +103,40 @@
next value
end
end
end
+ def self.measurable_impressions?(task)
+ task['try_convert'] && task['measurable_impressions'] &&
+ task['viewability_measured_rate_index'] && task['impressions_index']
+ end
+
+ def self.viewable_impressions?(task)
+ task['try_convert'] && task['viewable_impressions'] &&
+ task['viewability_rate_index'] && task['viewability_measured_rate_index'] && task['impressions_index']
+ end
+
def init
@task = task
end
def run
convert_options = {timezone: @task['report_parameters']['timezone']}
+ try_convert = @task['try_convert']
+ viewability_measured_rate_index = @task['viewability_measured_rate_index']
+ viewability_rate_index = @task['viewability_rate_index']
+ impressions_index = @task['impressions_index']
+
CSV.foreach(@task['tempfile_path'], {headers: true}).each do |row|
- row = @task['try_convert'] == "true" ? Lkqd.try_convert(row, convert_options) : row
+ row = try_convert ? Lkqd.try_convert(row, convert_options) : row
+ if Lkqd.measurable_impressions?(@task)
+ row << row[impressions_index] * row[viewability_measured_rate_index]
+ end
+
+ if Lkqd.viewable_impressions?(@task)
+ row << row[impressions_index] * row[viewability_measured_rate_index] * row[viewability_rate_index]
+ end
page_builder.add(row)
end
page_builder.finish
FileUtils.rm_rf(@task['tempfile_path'])
@@ -138,9 +181,10 @@
'Profit' => { 'type' => 'double' },
'Profit Margin' => { 'type' => 'double' },
'Clicks' => { 'type' => 'long' },
'CTR' => { 'type' => 'double' },
'Ad Starts' => { 'type' => 'long' },
+ 'Ad Starts Rate' => { 'type' => 'double' },
'25% Views' => { 'type' => 'long' },
'50% Views' => { 'type' => 'long' },
'75% Views' => { 'type' => 'long' },
'100% Views' => { 'type' => 'long' },
'25% View Rate' => { 'type' => 'double' },