lib/rbhive/schema_definition.rb in sequel-impala-1.0.1 vs lib/rbhive/schema_definition.rb in sequel-impala-1.1.0

- old
+ new

@@ -1,82 +1,82 @@ require 'json' module RBHive class SchemaDefinition attr_reader :schema - + NAN = Float::NAN rescue 0.0/0.0 INFINITY = Float::INFINITY rescue 1.0/0.0 - TYPES = { + TYPES = { :boolean => :to_s, :string => :to_s, :bigint => :to_i, :float => :to_f, :double => :to_f, :int => :to_i, :smallint => :to_i, :tinyint => :to_i, } - + def initialize(schema, example_row) @schema = schema @example_row = example_row ? example_row.split("\t") : [] end - + def column_names @column_names ||= begin schema_names = @schema.fieldSchemas.map {|c| c.name } - + # In rare cases Hive can return two identical column names # consider SELECT a.foo, b.foo... # in this case you get two columns called foo with no disambiguation. # as a (far from ideal) solution we detect this edge case and rename them # a.foo => foo1, b.foo => foo2 # otherwise we will trample one of the columns during Hash mapping. s = Hash.new(0) schema_names.map! { |c| s[c] += 1; s[c] > 1 ? "#{c}---|---#{s[c]}" : c } schema_names.map! { |c| s[c] > 1 ? "#{c}---|---1" : c } schema_names.map! { |c| c.gsub('---|---', '_').to_sym } - + # Lets fix the fact that Hive doesn't return schema data for partitions on SELECT * queries # For now we will call them :_p1, :_p2, etc. to avoid collisions. offset = 0 while schema_names.length < @example_row.length schema_names.push(:"_p#{offset+=1}") end schema_names end end - + def column_type_map - @column_type_map ||= column_names.inject({}) do |hsh, c| + @column_type_map ||= column_names.inject({}) do |hsh, c| definition = @schema.fieldSchemas.find {|s| s.name.to_sym == c } # If the column isn't in the schema (eg partitions in SELECT * queries) assume they are strings hsh[c] = definition ? definition.type.to_sym : :string hsh end end - + def coerce_row(row) column_names.zip(row.split("\t")).inject({}) do |hsh, (column_name, value)| hsh[column_name] = coerce_column(column_name, value) hsh end end - + def coerce_column(column_name, value) type = column_type_map[column_name] return INFINITY if (type != :string && value == "Infinity") return NAN if (type != :string && value == "NaN") return coerce_complex_value(value) if type.to_s =~ /^array/ conversion_method = TYPES[type] conversion_method ? value.send(conversion_method) : value end - + def coerce_row_to_array(row) column_names.map { |n| row[n] } end - + def coerce_complex_value(value) return nil if value.nil? return nil if value.length == 0 return nil if value == 'null' JSON.parse(value)