lib/mosql/cli.rb in mosql-0.1.2 vs lib/mosql/cli.rb in mosql-0.2.0

- old
+ new

@@ -104,12 +104,15 @@ end def connect_mongo @mongo = Mongo::Connection.from_uri(options[:mongo]) config = @mongo['admin'].command(:ismaster => 1) - if !config['setName'] - log.warn("`#{options[:mongo]}' is not a replset. Proceeding anyways...") + if !config['setName'] && !options[:skip_tail] + log.warn("`#{options[:mongo]}' is not a replset.") + log.warn("Will run the initial import, then stop.") + log.warn("Pass `--skip-tail' to suppress this warning.") + options[:skip_tail] = true end options[:service] ||= config['setName'] end def connect_sql @@ -119,11 +122,11 @@ @sql.db.loggers << Logger.new($stderr) end end def load_collections - collections = YAML.load(File.read(@options[:collections])) + collections = YAML.load_file(@options[:collections]) @schemamap = MoSQL::Schema.new(collections) end def run parse_args @@ -138,11 +141,13 @@ if options[:reimport] || tailer.read_timestamp.seconds == 0 initial_import end - optail + unless options[:skip_tail] + optail + end end # Helpers def collection_for_ns(ns) @@ -157,11 +162,11 @@ log.debug("Bulk insert error (#{e}), attempting invidual upserts...") cols = @schemamap.all_columns(@schemamap.find_ns(ns)) items.each do |it| h = {} cols.zip(it).each { |k,v| h[k] = v } - @sql.upsert(table, h) + @sql.upsert(table, @schemamap.primary_sql_key_for_ns(ns), h) end end end def with_retries(tries=10) @@ -187,11 +192,13 @@ end def initial_import @schemamap.create_schema(@sql.db, !options[:no_drop_tables]) - start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts'] + unless options[:skip_tail] + start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts'] + end want_dbs = @schemamap.all_mongo_dbs & @mongo.database_names want_dbs.each do |dbname| log.info("Importing for Mongo DB #{dbname}...") db = @mongo.db(dbname) @@ -201,11 +208,11 @@ import_collection(ns, collection) exit(0) if @done end end - tailer.write_timestamp(start_ts) + tailer.write_timestamp(start_ts) unless options[:skip_tail] end def import_collection(ns, collection) log.info("Importing for #{ns}...") count = 0 @@ -238,12 +245,10 @@ bulk_upsert(table, ns, batch) end end def optail - return if options[:skip_tail] - tailer.tail_from(options[:tail_from] ? BSON::Timestamp.new(options[:tail_from].to_i, 0) : nil) until @done tailer.stream(1000) do |op| @@ -251,15 +256,16 @@ end end end def sync_object(ns, _id) - sqlid = @sql.transform_one_ns(ns, { '_id' => _id })['_id'] - obj = collection_for_ns(ns).find_one({:_id => _id}) + primary_sql_key = @schemamap.primary_sql_key_for_ns(ns) + sqlid = @sql.transform_one_ns(ns, { '_id' => _id })[primary_sql_key] + obj = collection_for_ns(ns).find_one({:_id => _id}) if obj @sql.upsert_ns(ns, obj) else - @sql.table_for_ns(ns).where(:_id => sqlid).delete() + @sql.table_for_ns(ns).where(primary_sql_key.to_sym => sqlid).delete() end end def handle_op(op) log.debug("processing op: #{op.inspect}")