lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.5.0 vs lib/embulk/input/lkqd.rb in embulk-input-lkqd-0.6.0

- old
+ new

@@ -14,11 +14,14 @@ def self.transaction(config, &control) task = { "secret_key_id" => config.param("secret_key_id", :string), "secret_key" => config.param("secret_key", :string), "endpoint" => config.param("endpoint", :string, default: 'https://api.lkqd.com/reports'), - "try_convert" => config.param("try_convert", :string, default: "true"), + "copy_temp_to" => config.param("copy_temp_to", :string, default: nil), + "try_convert" => config.param("try_convert", :bool, default: true), + "measurable_impressions" => config.param("measurable_impressions", :bool, default: false), + "viewable_impressions" => config.param("viewable_impressions", :bool, default: false), "report_parameters" => config.param("report_parameters", :hash, default: {}), } task['authorization'] = Base64.urlsafe_encode64("#{task['secret_key_id']}:#{task['secret_key']}") response = request_lkqd({authorization: task['authorization'], endpoint: task['endpoint'], report_parameters: task['report_parameters']}) @@ -27,16 +30,34 @@ tempfile.write chunk end tempfile.close task['tempfile_path'] = tempfile.path + FileUtils.cp(task['tempfile_path'], task['copy_temp_to']) if task['copy_temp_to'] + columns = [] ::CSV.foreach(tempfile.path).first.each_with_index do |column_name, index| column_type = guess_column_type(task['report_parameters']['metrics'], column_name) - columns << Column.new({index: index}.merge(column_type)) + column = Column.new({index: index}.merge(column_type)) + if column.name == 'Viewability Measured Rate' + task['viewability_measured_rate_index'] = index + elsif column.name == 'Viewability Rate' + task['viewability_rate_index'] = index + elsif column.name == 'Impressions' + task['impressions_index'] = index + end + columns << column end + if measurable_impressions?(task) + columns << Column.new({index: columns.size, type: :double, name: 'Measurable Impressions'}) + end + + if viewable_impressions?(task) + columns << Column.new({index: columns.size, type: :double, name: 'Viewable Impressions'}) + end + resume(task, columns, 1, &control) end def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) @@ -82,18 +103,40 @@ next value end end end + def self.measurable_impressions?(task) + task['try_convert'] && task['measurable_impressions'] && + task['viewability_measured_rate_index'] && task['impressions_index'] + end + + def self.viewable_impressions?(task) + task['try_convert'] && task['viewable_impressions'] && + task['viewability_rate_index'] && task['viewability_measured_rate_index'] && task['impressions_index'] + end + def init @task = task end def run convert_options = {timezone: @task['report_parameters']['timezone']} + try_convert = @task['try_convert'] + viewability_measured_rate_index = @task['viewability_measured_rate_index'] + viewability_rate_index = @task['viewability_rate_index'] + impressions_index = @task['impressions_index'] + CSV.foreach(@task['tempfile_path'], {headers: true}).each do |row| - row = @task['try_convert'] == "true" ? Lkqd.try_convert(row, convert_options) : row + row = try_convert ? Lkqd.try_convert(row, convert_options) : row + if Lkqd.measurable_impressions?(@task) + row << row[impressions_index] * row[viewability_measured_rate_index] + end + + if Lkqd.viewable_impressions?(@task) + row << row[impressions_index] * row[viewability_measured_rate_index] * row[viewability_rate_index] + end page_builder.add(row) end page_builder.finish FileUtils.rm_rf(@task['tempfile_path']) @@ -138,9 +181,10 @@ 'Profit' => { 'type' => 'double' }, 'Profit Margin' => { 'type' => 'double' }, 'Clicks' => { 'type' => 'long' }, 'CTR' => { 'type' => 'double' }, 'Ad Starts' => { 'type' => 'long' }, + 'Ad Starts Rate' => { 'type' => 'double' }, '25% Views' => { 'type' => 'long' }, '50% Views' => { 'type' => 'long' }, '75% Views' => { 'type' => 'long' }, '100% Views' => { 'type' => 'long' }, '25% View Rate' => { 'type' => 'double' },