lib/mosql/streamer.rb in mosql-0.3.1 vs lib/mosql/streamer.rb in mosql-0.3.2

- old
+ new

@@ -91,27 +91,45 @@ unless options[:skip_tail] start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts'] end - @mongo.database_names.each do |dbname| - next unless spec = @schema.find_db(dbname) + dbnames = [] + + if dbname = options[:dbname] + log.info "Skipping DB scan and using db: #{dbname}" + dbnames = [ dbname ] + else + dbnames = @mongo.database_names + end + + dbnames.each do |dbname| + spec = @schema.find_db(dbname) + + if(spec.nil?) + log.info("Mongd DB '#{dbname}' not found in config file. Skipping.") + next + end + log.info("Importing for Mongo DB #{dbname}...") db = @mongo.db(dbname) - db.collections.select { |c| spec.key?(c.name) }.each do |collection| + collections = db.collections.select { |c| spec.key?(c.name) } + + collections.each do |collection| ns = "#{dbname}.#{collection.name}" - import_collection(ns, collection) + import_collection(ns, collection, spec[collection.name][:meta][:filter]) exit(0) if @done end end + tailer.write_timestamp(start_ts) unless options[:skip_tail] end def did_truncate; @did_truncate ||= {}; end - def import_collection(ns, collection) + def import_collection(ns, collection, filter) log.info("Importing for #{ns}...") count = 0 batch = [] table = @sql.table_for_ns(ns) unless options[:no_drop_tables] || did_truncate[table.first_source] @@ -119,10 +137,10 @@ did_truncate[table.first_source] = true end start = Time.now sql_time = 0 - collection.find(nil, :batch_size => BATCH) do |cursor| + collection.find(filter, :batch_size => BATCH) do |cursor| with_retries do cursor.each do |obj| batch << @schema.transform(ns, obj) count += 1