lib/embulk/input/presto.rb in embulk-input-presto-0.1.1 vs lib/embulk/input/presto.rb in embulk-input-presto-0.1.2

- old
+ new

@@ -1,59 +1,93 @@ module Embulk - class InputPresto < InputPlugin - require 'presto-client' + module Input + class Presto < InputPlugin + require 'presto-client' - Plugin.register_input("presto", self) + Plugin.register_input("presto", self) - def self.transaction(config, &control) - task = { - "host" => config.param("host", :string, default: "localhost"), - "port" => config.param("port", :integer, default: 8080), - "schema" => config.param("schema", :string, default: "default"), - "catalog" => config.param("catalog", :string, default: "native"), - "query" => config.param("query", :string), - "user" => config.param("user", :string, default: "embulk"), - "columns" => config.param("columns", :array, default: []) - } + def self.transaction(config, &control) + task = { + "host" => config.param("host", :string, default: "localhost"), + "port" => config.param("port", :integer, default: 8080), + "schema" => config.param("schema", :string, default: "default"), + "catalog" => config.param("catalog", :string, default: "native"), + "query" => config.param("query", :string), + "user" => config.param("user", :string, default: "embulk"), + "columns" => config.param("columns", :array) + } - columns = task['columns'].each_with_index.map do |c, i| - Column.new(i, c["name"], c["type"].to_sym) + columns = task['columns'].each_with_index.map do |c, i| + Column.new(i, c["name"], c["type"].to_sym) + end + + resume(task, columns, 1, &control) end - resume(task, columns, 1, &control) - end + def self.resume(task, columns, count, &control) + task_reports = yield(task, columns, count) - def self.resume(task, columns, count, &control) - task_reports = yield(task, columns, count) + next_config_diff = {} + return next_config_diff + end - next_config_diff = {} - return next_config_diff - end + def init + @client = ::Presto::Client.new( + server: "#{task['host']}:#{task['port']}", + catalog: task['catalog'], + user: task['user'], + schema: task['schema'] + ) + @query = task["query"] - def init - @client = Presto::Client.new( - server: "#{task['host']}:#{task['port']}", - catalog: task['catalog'], - user: task['user'], - schema: task['schema'] - ) - @query = task["query"] + Embulk.logger.info "SQL: #{@query}" + end - Embulk.logger.info "SQL: #{@query}" - end + def run + size = 0 + @client.query(@query) do |q| + q.each_row {|row| + converted_values = row.map.with_index { |value,i| convert_value(value, schema[i]) } + page_builder.add(converted_values) + } + size = q.rows.size + end - def run - size = 0 - @client.query(@query) do |q| - q.each_row {|row| - page_builder.add(row) - } - size = q.rows.size + page_builder.finish + + task_report = { size: size } + return task_report end - page_builder.finish - - task_report = { size: size } - return task_report + def convert_value(value, field) + return nil if value.nil? + case field["type"] + when :string + value + when :long + value.to_i + when :double + value.to_f + when :boolean + if value.is_a?(TrueClass) || value.is_a?(FalseClass) + value + else + downcased_val = value.downcase + case downcased_val + when 'true' then true + when 'false' then false + when '1' then true + when '0' then false + else nil + end + end + when :timestamp + Time.parse(value) + when :json + value + else + raise "Unsupported type #{field['type']}" + end + end end end end