lib/mosql/schema.rb in mosql-0.1.2 vs lib/mosql/schema.rb in mosql-0.2.0
- old
+ new
@@ -2,48 +2,76 @@
class SchemaError < StandardError; end;
class Schema
include MoSQL::Logging
- def to_ordered_hash(lst)
- hash = BSON::OrderedHash.new
+ def to_array(lst)
+ array = []
lst.each do |ent|
- raise "Invalid ordered hash entry #{ent.inspect}" unless ent.is_a?(Hash) && ent.keys.length == 1
- field, type = ent.first
- hash[field] = type
+ if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String)
+ # new configuration format
+ array << {
+ :source => ent.delete(:source),
+ :type => ent.delete(:type),
+ :name => ent.first.first,
+ }
+ elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String)
+ array << {
+ :source => ent.first.first,
+ :name => ent.first.first,
+ :type => ent.first.last
+ }
+ else
+ raise "Invalid ordered hash entry #{ent.inspect}"
+ end
+
end
- hash
+ array
end
- def parse_spec(spec)
+ def check_columns!(ns, spec)
+ seen = Set.new
+ spec[:columns].each do |col|
+ if seen.include?(col[:source])
+ raise "Duplicate source #{col[:source]} in column definition #{col[:name]} for #{ns}."
+ end
+ seen.add(col[:source])
+ end
+ end
+
+ def parse_spec(ns, spec)
out = spec.dup
- out[:columns] = to_ordered_hash(spec[:columns])
+ out[:columns] = to_array(spec[:columns])
+ check_columns!(ns, out)
out
end
def initialize(map)
@map = {}
map.each do |dbname, db|
@map[dbname] ||= {}
db.each do |cname, spec|
- @map[dbname][cname] = parse_spec(spec)
+ @map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec)
end
end
end
def create_schema(db, clobber=false)
@map.values.map(&:values).flatten.each do |collection|
meta = collection[:meta]
log.info("Creating table '#{meta[:table]}'...")
db.send(clobber ? :create_table! : :create_table?, meta[:table]) do
- collection[:columns].each do |field, type|
- column field, type
+ collection[:columns].each do |col|
+ column col[:name], col[:type]
+
+ if col[:source].to_sym == :_id
+ primary_key [col[:name].to_sym]
+ end
end
if meta[:extra_props]
column '_extra_props', 'TEXT'
end
- primary_key [:_id]
end
end
end
def find_ns(ns)
@@ -60,17 +88,40 @@
schema = find_ns(ns)
raise SchemaError.new("No mapping for namespace: #{ns}") if schema.nil?
schema
end
+ def fetch_and_delete_dotted(obj, dotted)
+ pieces = dotted.split(".")
+ breadcrumbs = []
+ while pieces.length > 1
+ key = pieces.shift
+ breadcrumbs << [obj, key]
+ obj = obj[key]
+ return nil unless obj.is_a?(Hash)
+ end
+
+ val = obj.delete(pieces.first)
+
+ breadcrumbs.reverse.each do |obj, key|
+ obj.delete(key) if obj[key].empty?
+ end
+
+ val
+ end
+
def transform(ns, obj, schema=nil)
schema ||= find_ns!(ns)
obj = obj.dup
row = []
- schema[:columns].each do |name, type|
- v = obj.delete(name)
+ schema[:columns].each do |col|
+
+ source = col[:source]
+ type = col[:type]
+
+ v = fetch_and_delete_dotted(obj, source)
case v
when BSON::Binary, BSON::ObjectId
v = v.to_s
end
row << v
@@ -89,20 +140,22 @@
row
end
def all_columns(schema)
- cols = schema[:columns].keys
+ cols = []
+ schema[:columns].each do |col|
+ cols << col[:name]
+ end
if schema[:meta][:extra_props]
cols << "_extra_props"
end
cols
end
def copy_data(db, ns, objs)
schema = find_ns!(ns)
- data = objs.map { |o| transform_to_copy(ns, o, schema) }.join("\n")
db.synchronize do |pg|
sql = "COPY \"#{schema[:meta][:table]}\" " +
"(#{all_columns(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN"
pg.execute(sql)
objs.each do |o|
@@ -142,8 +195,12 @@
@map.keys
end
def collections_for_mongo_db(db)
(@map[db]||{}).keys
+ end
+
+ def primary_sql_key_for_ns(ns)
+ find_ns!(ns)[:columns].find {|c| c[:source] == '_id'}[:name]
end
end
end