lib/mosql/schema.rb in mosql-0.1.2 vs lib/mosql/schema.rb in mosql-0.2.0

- old
+ new

@@ -2,48 +2,76 @@ class SchemaError < StandardError; end; class Schema include MoSQL::Logging - def to_ordered_hash(lst) - hash = BSON::OrderedHash.new + def to_array(lst) + array = [] lst.each do |ent| - raise "Invalid ordered hash entry #{ent.inspect}" unless ent.is_a?(Hash) && ent.keys.length == 1 - field, type = ent.first - hash[field] = type + if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String) + # new configuration format + array << { + :source => ent.delete(:source), + :type => ent.delete(:type), + :name => ent.first.first, + } + elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) + array << { + :source => ent.first.first, + :name => ent.first.first, + :type => ent.first.last + } + else + raise "Invalid ordered hash entry #{ent.inspect}" + end + end - hash + array end - def parse_spec(spec) + def check_columns!(ns, spec) + seen = Set.new + spec[:columns].each do |col| + if seen.include?(col[:source]) + raise "Duplicate source #{col[:source]} in column definition #{col[:name]} for #{ns}." + end + seen.add(col[:source]) + end + end + + def parse_spec(ns, spec) out = spec.dup - out[:columns] = to_ordered_hash(spec[:columns]) + out[:columns] = to_array(spec[:columns]) + check_columns!(ns, out) out end def initialize(map) @map = {} map.each do |dbname, db| @map[dbname] ||= {} db.each do |cname, spec| - @map[dbname][cname] = parse_spec(spec) + @map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec) end end end def create_schema(db, clobber=false) @map.values.map(&:values).flatten.each do |collection| meta = collection[:meta] log.info("Creating table '#{meta[:table]}'...") db.send(clobber ? :create_table! : :create_table?, meta[:table]) do - collection[:columns].each do |field, type| - column field, type + collection[:columns].each do |col| + column col[:name], col[:type] + + if col[:source].to_sym == :_id + primary_key [col[:name].to_sym] + end end if meta[:extra_props] column '_extra_props', 'TEXT' end - primary_key [:_id] end end end def find_ns(ns) @@ -60,17 +88,40 @@ schema = find_ns(ns) raise SchemaError.new("No mapping for namespace: #{ns}") if schema.nil? schema end + def fetch_and_delete_dotted(obj, dotted) + pieces = dotted.split(".") + breadcrumbs = [] + while pieces.length > 1 + key = pieces.shift + breadcrumbs << [obj, key] + obj = obj[key] + return nil unless obj.is_a?(Hash) + end + + val = obj.delete(pieces.first) + + breadcrumbs.reverse.each do |obj, key| + obj.delete(key) if obj[key].empty? + end + + val + end + def transform(ns, obj, schema=nil) schema ||= find_ns!(ns) obj = obj.dup row = [] - schema[:columns].each do |name, type| - v = obj.delete(name) + schema[:columns].each do |col| + + source = col[:source] + type = col[:type] + + v = fetch_and_delete_dotted(obj, source) case v when BSON::Binary, BSON::ObjectId v = v.to_s end row << v @@ -89,20 +140,22 @@ row end def all_columns(schema) - cols = schema[:columns].keys + cols = [] + schema[:columns].each do |col| + cols << col[:name] + end if schema[:meta][:extra_props] cols << "_extra_props" end cols end def copy_data(db, ns, objs) schema = find_ns!(ns) - data = objs.map { |o| transform_to_copy(ns, o, schema) }.join("\n") db.synchronize do |pg| sql = "COPY \"#{schema[:meta][:table]}\" " + "(#{all_columns(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" pg.execute(sql) objs.each do |o| @@ -142,8 +195,12 @@ @map.keys end def collections_for_mongo_db(db) (@map[db]||{}).keys + end + + def primary_sql_key_for_ns(ns) + find_ns!(ns)[:columns].find {|c| c[:source] == '_id'}[:name] end end end