lib/mosql/schema.rb in mosql-0.2.0 vs lib/mosql/schema.rb in mosql-0.3.0

- old
+ new

@@ -8,78 +8,109 @@ array = [] lst.each do |ent| if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String) # new configuration format array << { - :source => ent.delete(:source), - :type => ent.delete(:type), - :name => ent.first.first, + :source => ent.fetch(:source), + :type => ent.fetch(:type), + :name => (ent.keys - [:source, :type]).first, } elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) array << { :source => ent.first.first, :name => ent.first.first, :type => ent.first.last } else - raise "Invalid ordered hash entry #{ent.inspect}" + raise SchemaError.new("Invalid ordered hash entry #{ent.inspect}") end end array end def check_columns!(ns, spec) seen = Set.new spec[:columns].each do |col| if seen.include?(col[:source]) - raise "Duplicate source #{col[:source]} in column definition #{col[:name]} for #{ns}." + raise SchemaError.new("Duplicate source #{col[:source]} in column definition #{col[:name]} for #{ns}.") end seen.add(col[:source]) end end def parse_spec(ns, spec) out = spec.dup - out[:columns] = to_array(spec[:columns]) + out[:columns] = to_array(spec.fetch(:columns)) check_columns!(ns, out) out end + def parse_meta(meta) + meta = {} if meta.nil? + meta[:alias] = [] unless meta.key?(:alias) + meta[:alias] = [meta[:alias]] unless meta[:alias].is_a?(Array) + meta[:alias] = meta[:alias].map { |r| Regexp.new(r) } + meta + end + def initialize(map) @map = {} map.each do |dbname, db| - @map[dbname] ||= {} + @map[dbname] = { :meta => parse_meta(db[:meta]) } db.each do |cname, spec| - @map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec) + next unless cname.is_a?(String) + begin + @map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec) + rescue KeyError => e + raise SchemaError.new("In spec for #{dbname}.#{cname}: #{e}") + end end end end def create_schema(db, clobber=false) - @map.values.map(&:values).flatten.each do |collection| - meta = collection[:meta] - log.info("Creating table '#{meta[:table]}'...") - db.send(clobber ? :create_table! : :create_table?, meta[:table]) do - collection[:columns].each do |col| - column col[:name], col[:type] + @map.values.each do |dbspec| + dbspec.each do |n, collection| + next unless n.is_a?(String) + meta = collection[:meta] + log.info("Creating table '#{meta[:table]}'...") + db.send(clobber ? :create_table! : :create_table?, meta[:table]) do + collection[:columns].each do |col| + opts = {} + if col[:source] == '$timestamp' + opts[:default] = Sequel.function(:now) + end + column col[:name], col[:type], opts - if col[:source].to_sym == :_id - primary_key [col[:name].to_sym] + if col[:source].to_sym == :_id + primary_key [col[:name].to_sym] + end end + if meta[:extra_props] + column '_extra_props', 'TEXT' + end end - if meta[:extra_props] - column '_extra_props', 'TEXT' - end end end end + def find_db(db) + unless @map.key?(db) + @map[db] = @map.values.find do |spec| + spec && spec[:meta][:alias].any? { |a| a.match(db) } + end + end + @map[db] + end + def find_ns(ns) db, collection = ns.split(".") - schema = (@map[db] || {})[collection] - if schema.nil? + unless spec = find_db(db) + return nil + end + unless schema = spec[collection] log.debug("No mapping for ns: #{ns}") return nil end schema end @@ -107,58 +138,89 @@ end val end + def fetch_special_source(obj, source) + case source + when "$timestamp" + Sequel.function(:now) + else + raise SchemaError.new("Unknown source: #{source}") + end + end + def transform(ns, obj, schema=nil) schema ||= find_ns!(ns) obj = obj.dup row = [] schema[:columns].each do |col| source = col[:source] type = col[:type] - v = fetch_and_delete_dotted(obj, source) - case v - when BSON::Binary, BSON::ObjectId - v = v.to_s + if source.start_with?("$") + v = fetch_special_source(obj, source) + else + v = fetch_and_delete_dotted(obj, source) + case v + when BSON::Binary, BSON::ObjectId, Symbol + v = v.to_s + when Hash, Array + v = JSON.dump(v) + end end row << v end if schema[:meta][:extra_props] # Kludgily delete binary blobs from _extra_props -- they may # contain invalid UTF-8, which to_json will not properly encode. + extra = {} obj.each do |k,v| - obj.delete(k) if v.is_a?(BSON::Binary) + case v + when BSON::Binary + next + when Float + # NaN is illegal in JSON. Translate into null. + v = nil if v.nan? + end + extra[k] = v end - row << obj.to_json + row << JSON.dump(extra) end log.debug { "Transformed: #{row.inspect}" } row end - def all_columns(schema) + def copy_column?(col) + col[:source] != '$timestamp' + end + + def all_columns(schema, copy=false) cols = [] schema[:columns].each do |col| - cols << col[:name] + cols << col[:name] unless copy && !copy_column?(col) end if schema[:meta][:extra_props] cols << "_extra_props" end cols end + def all_columns_for_copy(schema) + all_columns(schema, true) + end + def copy_data(db, ns, objs) schema = find_ns!(ns) db.synchronize do |pg| sql = "COPY \"#{schema[:meta][:table]}\" " + - "(#{all_columns(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" + "(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" pg.execute(sql) objs.each do |o| pg.put_copy_data(transform_to_copy(ns, o, schema) + "\n") end pg.put_copy_end @@ -176,16 +238,18 @@ "\\N" when true 't' when false 'f' + when Sequel::SQL::Function + nil else val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1') end end def transform_to_copy(ns, row, schema=nil) - row.map { |c| quote_copy(c) }.join("\t") + row.map { |c| quote_copy(c) }.compact.join("\t") end def table_for_ns(ns) find_ns!(ns)[:meta][:table] end