lib/embulk/input/presto.rb in embulk-input-presto-0.1.2 vs lib/embulk/input/presto.rb in embulk-input-presto-0.2.0

- old
+ new

@@ -1,5 +1,9 @@ +require_relative 'presto/type_converter' +require_relative 'presto/explain_parser' +require_relative 'presto/connection' + module Embulk module Input class Presto < InputPlugin require 'presto-client' @@ -11,15 +15,19 @@ "port" => config.param("port", :integer, default: 8080), "schema" => config.param("schema", :string, default: "default"), "catalog" => config.param("catalog", :string, default: "native"), "query" => config.param("query", :string), "user" => config.param("user", :string, default: "embulk"), - "columns" => config.param("columns", :array) + "columns" => config.param("columns", :array, default: nil) } - columns = task['columns'].each_with_index.map do |c, i| - Column.new(i, c["name"], c["type"].to_sym) + columns = if task['columns'] + task['columns'].each_with_index.map do |c, i| + Column.new(i, c["name"], c["type"].to_sym) + end + else + build_output_columns(task) end resume(task, columns, 1, &control) end @@ -28,66 +36,43 @@ next_config_diff = {} return next_config_diff end + def self.build_output_columns(task) + explain_query = "explain (FORMAT TEXT) " + task["query"] + Embulk.logger.debug("SQL: #{explain_query}") + explain_result = Connection.get_client(task).run("explain (FORMAT TEXT) " + task["query"]) + + columns = [] + ExplainParser.parse(explain_result).each_with_index do |(name, type), i| + columns << Column.new(i, name, TypeConverter.get_type(type)) + end + columns + end + def init - @client = ::Presto::Client.new( - server: "#{task['host']}:#{task['port']}", - catalog: task['catalog'], - user: task['user'], - schema: task['schema'] - ) + @client = Connection.get_client(task) @query = task["query"] + @type_converter = TypeConverter.new Embulk.logger.info "SQL: #{@query}" end def run size = 0 @client.query(@query) do |q| q.each_row {|row| - converted_values = row.map.with_index { |value,i| convert_value(value, schema[i]) } + converted_values = row.map.with_index { |value,i| @type_converter.convert_value(value, schema[i]) } page_builder.add(converted_values) } size = q.rows.size end page_builder.finish task_report = { size: size } return task_report - end - - def convert_value(value, field) - return nil if value.nil? - case field["type"] - when :string - value - when :long - value.to_i - when :double - value.to_f - when :boolean - if value.is_a?(TrueClass) || value.is_a?(FalseClass) - value - else - downcased_val = value.downcase - case downcased_val - when 'true' then true - when 'false' then false - when '1' then true - when '0' then false - else nil - end - end - when :timestamp - Time.parse(value) - when :json - value - else - raise "Unsupported type #{field['type']}" - end end end end end