lib/mosql/streamer.rb in mosql-0.3.1 vs lib/mosql/streamer.rb in mosql-0.3.2
- old
+ new
@@ -91,27 +91,45 @@
unless options[:skip_tail]
start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts']
end
- @mongo.database_names.each do |dbname|
- next unless spec = @schema.find_db(dbname)
+ dbnames = []
+
+ if dbname = options[:dbname]
+ log.info "Skipping DB scan and using db: #{dbname}"
+ dbnames = [ dbname ]
+ else
+ dbnames = @mongo.database_names
+ end
+
+ dbnames.each do |dbname|
+ spec = @schema.find_db(dbname)
+
+ if(spec.nil?)
+ log.info("Mongd DB '#{dbname}' not found in config file. Skipping.")
+ next
+ end
+
log.info("Importing for Mongo DB #{dbname}...")
db = @mongo.db(dbname)
- db.collections.select { |c| spec.key?(c.name) }.each do |collection|
+ collections = db.collections.select { |c| spec.key?(c.name) }
+
+ collections.each do |collection|
ns = "#{dbname}.#{collection.name}"
- import_collection(ns, collection)
+ import_collection(ns, collection, spec[collection.name][:meta][:filter])
exit(0) if @done
end
end
+
tailer.write_timestamp(start_ts) unless options[:skip_tail]
end
def did_truncate; @did_truncate ||= {}; end
- def import_collection(ns, collection)
+ def import_collection(ns, collection, filter)
log.info("Importing for #{ns}...")
count = 0
batch = []
table = @sql.table_for_ns(ns)
unless options[:no_drop_tables] || did_truncate[table.first_source]
@@ -119,10 +137,10 @@
did_truncate[table.first_source] = true
end
start = Time.now
sql_time = 0
- collection.find(nil, :batch_size => BATCH) do |cursor|
+ collection.find(filter, :batch_size => BATCH) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
count += 1