lib/mosql/cli.rb in mosql-0.1.2 vs lib/mosql/cli.rb in mosql-0.2.0
- old
+ new
@@ -104,12 +104,15 @@
end
def connect_mongo
@mongo = Mongo::Connection.from_uri(options[:mongo])
config = @mongo['admin'].command(:ismaster => 1)
- if !config['setName']
- log.warn("`#{options[:mongo]}' is not a replset. Proceeding anyways...")
+ if !config['setName'] && !options[:skip_tail]
+ log.warn("`#{options[:mongo]}' is not a replset.")
+ log.warn("Will run the initial import, then stop.")
+ log.warn("Pass `--skip-tail' to suppress this warning.")
+ options[:skip_tail] = true
end
options[:service] ||= config['setName']
end
def connect_sql
@@ -119,11 +122,11 @@
@sql.db.loggers << Logger.new($stderr)
end
end
def load_collections
- collections = YAML.load(File.read(@options[:collections]))
+ collections = YAML.load_file(@options[:collections])
@schemamap = MoSQL::Schema.new(collections)
end
def run
parse_args
@@ -138,11 +141,13 @@
if options[:reimport] || tailer.read_timestamp.seconds == 0
initial_import
end
- optail
+ unless options[:skip_tail]
+ optail
+ end
end
# Helpers
def collection_for_ns(ns)
@@ -157,11 +162,11 @@
log.debug("Bulk insert error (#{e}), attempting invidual upserts...")
cols = @schemamap.all_columns(@schemamap.find_ns(ns))
items.each do |it|
h = {}
cols.zip(it).each { |k,v| h[k] = v }
- @sql.upsert(table, h)
+ @sql.upsert(table, @schemamap.primary_sql_key_for_ns(ns), h)
end
end
end
def with_retries(tries=10)
@@ -187,11 +192,13 @@
end
def initial_import
@schemamap.create_schema(@sql.db, !options[:no_drop_tables])
- start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts']
+ unless options[:skip_tail]
+ start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts']
+ end
want_dbs = @schemamap.all_mongo_dbs & @mongo.database_names
want_dbs.each do |dbname|
log.info("Importing for Mongo DB #{dbname}...")
db = @mongo.db(dbname)
@@ -201,11 +208,11 @@
import_collection(ns, collection)
exit(0) if @done
end
end
- tailer.write_timestamp(start_ts)
+ tailer.write_timestamp(start_ts) unless options[:skip_tail]
end
def import_collection(ns, collection)
log.info("Importing for #{ns}...")
count = 0
@@ -238,12 +245,10 @@
bulk_upsert(table, ns, batch)
end
end
def optail
- return if options[:skip_tail]
-
tailer.tail_from(options[:tail_from] ?
BSON::Timestamp.new(options[:tail_from].to_i, 0) :
nil)
until @done
tailer.stream(1000) do |op|
@@ -251,15 +256,16 @@
end
end
end
def sync_object(ns, _id)
- sqlid = @sql.transform_one_ns(ns, { '_id' => _id })['_id']
- obj = collection_for_ns(ns).find_one({:_id => _id})
+ primary_sql_key = @schemamap.primary_sql_key_for_ns(ns)
+ sqlid = @sql.transform_one_ns(ns, { '_id' => _id })[primary_sql_key]
+ obj = collection_for_ns(ns).find_one({:_id => _id})
if obj
@sql.upsert_ns(ns, obj)
else
- @sql.table_for_ns(ns).where(:_id => sqlid).delete()
+ @sql.table_for_ns(ns).where(primary_sql_key.to_sym => sqlid).delete()
end
end
def handle_op(op)
log.debug("processing op: #{op.inspect}")