lib/embulk/input/presto.rb in embulk-input-presto-0.1.2 vs lib/embulk/input/presto.rb in embulk-input-presto-0.2.0
- old
+ new
@@ -1,5 +1,9 @@
+require_relative 'presto/type_converter'
+require_relative 'presto/explain_parser'
+require_relative 'presto/connection'
+
module Embulk
module Input
class Presto < InputPlugin
require 'presto-client'
@@ -11,15 +15,19 @@
"port" => config.param("port", :integer, default: 8080),
"schema" => config.param("schema", :string, default: "default"),
"catalog" => config.param("catalog", :string, default: "native"),
"query" => config.param("query", :string),
"user" => config.param("user", :string, default: "embulk"),
- "columns" => config.param("columns", :array)
+ "columns" => config.param("columns", :array, default: nil)
}
- columns = task['columns'].each_with_index.map do |c, i|
- Column.new(i, c["name"], c["type"].to_sym)
+ columns = if task['columns']
+ task['columns'].each_with_index.map do |c, i|
+ Column.new(i, c["name"], c["type"].to_sym)
+ end
+ else
+ build_output_columns(task)
end
resume(task, columns, 1, &control)
end
@@ -28,66 +36,43 @@
next_config_diff = {}
return next_config_diff
end
+ def self.build_output_columns(task)
+ explain_query = "explain (FORMAT TEXT) " + task["query"]
+ Embulk.logger.debug("SQL: #{explain_query}")
+ explain_result = Connection.get_client(task).run("explain (FORMAT TEXT) " + task["query"])
+
+ columns = []
+ ExplainParser.parse(explain_result).each_with_index do |(name, type), i|
+ columns << Column.new(i, name, TypeConverter.get_type(type))
+ end
+ columns
+ end
+
def init
- @client = ::Presto::Client.new(
- server: "#{task['host']}:#{task['port']}",
- catalog: task['catalog'],
- user: task['user'],
- schema: task['schema']
- )
+ @client = Connection.get_client(task)
@query = task["query"]
+ @type_converter = TypeConverter.new
Embulk.logger.info "SQL: #{@query}"
end
def run
size = 0
@client.query(@query) do |q|
q.each_row {|row|
- converted_values = row.map.with_index { |value,i| convert_value(value, schema[i]) }
+ converted_values = row.map.with_index { |value,i| @type_converter.convert_value(value, schema[i]) }
page_builder.add(converted_values)
}
size = q.rows.size
end
page_builder.finish
task_report = { size: size }
return task_report
- end
-
- def convert_value(value, field)
- return nil if value.nil?
- case field["type"]
- when :string
- value
- when :long
- value.to_i
- when :double
- value.to_f
- when :boolean
- if value.is_a?(TrueClass) || value.is_a?(FalseClass)
- value
- else
- downcased_val = value.downcase
- case downcased_val
- when 'true' then true
- when 'false' then false
- when '1' then true
- when '0' then false
- else nil
- end
- end
- when :timestamp
- Time.parse(value)
- when :json
- value
- else
- raise "Unsupported type #{field['type']}"
- end
end
end
end
end