lib/mosql/cli.rb in mosql-0.2.0 vs lib/mosql/cli.rb in mosql-0.3.0
- old
+ new
@@ -25,11 +25,11 @@
def setup_signal_handlers
%w[TERM INT USR2].each do |sig|
Signal.trap(sig) do
log.info("Got SIG#{sig}. Preparing to exit...")
- @done = true
+ @streamer.stop
end
end
end
def parse_args
@@ -88,10 +88,14 @@
end
opts.on("--no-drop-tables", "Don't drop the table if it exists during the initial import") do
@options[:no_drop_tables] = true
end
+
+ opts.on("--unsafe", "Ignore rows that cause errors on insert") do
+ @options[:unsafe] = true
+ end
end
optparse.parse!(@args)
log = Log4r::Logger.new('Stripe')
@@ -102,11 +106,11 @@
log.level = Log4r::INFO
end
end
def connect_mongo
- @mongo = Mongo::Connection.from_uri(options[:mongo])
+ @mongo = Mongo::MongoClient.from_uri(options[:mongo])
config = @mongo['admin'].command(:ismaster => 1)
if !config['setName'] && !options[:skip_tail]
log.warn("`#{options[:mongo]}' is not a replset.")
log.warn("Will run the initial import, then stop.")
log.warn("Pass `--skip-tail' to suppress this warning.")
@@ -114,20 +118,26 @@
end
options[:service] ||= config['setName']
end
def connect_sql
- @sql = MoSQL::SQLAdapter.new(@schemamap, options[:sql], options[:schema])
+ @sql = MoSQL::SQLAdapter.new(@schema, options[:sql], options[:schema])
if options[:verbose] >= 2
@sql.db.sql_log_level = :debug
@sql.db.loggers << Logger.new($stderr)
end
end
def load_collections
collections = YAML.load_file(@options[:collections])
- @schemamap = MoSQL::Schema.new(collections)
+ begin
+ @schema = MoSQL::Schema.new(collections)
+ rescue MoSQL::SchemaError => e
+ log.error("Error parsing collection map `#{@options[:collections]}':")
+ log.error(e.to_s)
+ exit(1)
+ end
end
def run
parse_args
load_collections
@@ -137,186 +147,19 @@
metadata_table = MoSQL::Tailer.create_table(@sql.db, 'mosql_tailers')
@tailer = MoSQL::Tailer.new([@mongo], :existing, metadata_table,
:service => options[:service])
- if options[:reimport] || tailer.read_timestamp.seconds == 0
- initial_import
- end
+ @streamer = Streamer.new(:options => @options,
+ :tailer => @tailer,
+ :mongo => @mongo,
+ :sql => @sql,
+ :schema => @schema)
- unless options[:skip_tail]
- optail
- end
- end
+ @streamer.import
- # Helpers
-
- def collection_for_ns(ns)
- dbname, collection = ns.split(".", 2)
- @mongo.db(dbname).collection(collection)
- end
-
- def bulk_upsert(table, ns, items)
- begin
- @schemamap.copy_data(table.db, ns, items)
- rescue Sequel::DatabaseError => e
- log.debug("Bulk insert error (#{e}), attempting invidual upserts...")
- cols = @schemamap.all_columns(@schemamap.find_ns(ns))
- items.each do |it|
- h = {}
- cols.zip(it).each { |k,v| h[k] = v }
- @sql.upsert(table, @schemamap.primary_sql_key_for_ns(ns), h)
- end
- end
- end
-
- def with_retries(tries=10)
- tries.times do |try|
- begin
- yield
- rescue Mongo::ConnectionError, Mongo::ConnectionFailure, Mongo::OperationFailure => e
- # Duplicate key error
- raise if e.kind_of?(Mongo::OperationFailure) && [11000, 11001].include?(e.error_code)
- # Cursor timeout
- raise if e.kind_of?(Mongo::OperationFailure) && e.message =~ /^Query response returned CURSOR_NOT_FOUND/
- delay = 0.5 * (1.5 ** try)
- log.warn("Mongo exception: #{e}, sleeping #{delay}s...")
- sleep(delay)
- end
- end
- end
-
- def track_time
- start = Time.now
- yield
- Time.now - start
- end
-
- def initial_import
- @schemamap.create_schema(@sql.db, !options[:no_drop_tables])
-
unless options[:skip_tail]
- start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts']
- end
-
- want_dbs = @schemamap.all_mongo_dbs & @mongo.database_names
- want_dbs.each do |dbname|
- log.info("Importing for Mongo DB #{dbname}...")
- db = @mongo.db(dbname)
- want = Set.new(@schemamap.collections_for_mongo_db(dbname))
- db.collections.select { |c| want.include?(c.name) }.each do |collection|
- ns = "#{dbname}.#{collection.name}"
- import_collection(ns, collection)
- exit(0) if @done
- end
- end
-
- tailer.write_timestamp(start_ts) unless options[:skip_tail]
- end
-
- def import_collection(ns, collection)
- log.info("Importing for #{ns}...")
- count = 0
- batch = []
- table = @sql.table_for_ns(ns)
- table.truncate unless options[:no_drop_tables]
-
- start = Time.now
- sql_time = 0
- collection.find(nil, :batch_size => BATCH) do |cursor|
- with_retries do
- cursor.each do |obj|
- batch << @schemamap.transform(ns, obj)
- count += 1
-
- if batch.length >= BATCH
- sql_time += track_time do
- bulk_upsert(table, ns, batch)
- end
- elapsed = Time.now - start
- log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
- batch.clear
- exit(0) if @done
- end
- end
- end
- end
-
- unless batch.empty?
- bulk_upsert(table, ns, batch)
- end
- end
-
- def optail
- tailer.tail_from(options[:tail_from] ?
- BSON::Timestamp.new(options[:tail_from].to_i, 0) :
- nil)
- until @done
- tailer.stream(1000) do |op|
- handle_op(op)
- end
- end
- end
-
- def sync_object(ns, _id)
- primary_sql_key = @schemamap.primary_sql_key_for_ns(ns)
- sqlid = @sql.transform_one_ns(ns, { '_id' => _id })[primary_sql_key]
- obj = collection_for_ns(ns).find_one({:_id => _id})
- if obj
- @sql.upsert_ns(ns, obj)
- else
- @sql.table_for_ns(ns).where(primary_sql_key.to_sym => sqlid).delete()
- end
- end
-
- def handle_op(op)
- log.debug("processing op: #{op.inspect}")
- unless op['ns'] && op['op']
- log.warn("Weird op: #{op.inspect}")
- return
- end
-
- unless @schemamap.find_ns(op['ns'])
- log.debug("Skipping op for unknown ns #{op['ns']}...")
- return
- end
-
- ns = op['ns']
- dbname, collection_name = ns.split(".", 2)
-
- case op['op']
- when 'n'
- log.debug("Skipping no-op #{op.inspect}")
- when 'i'
- if collection_name == 'system.indexes'
- log.info("Skipping index update: #{op.inspect}")
- else
- @sql.upsert_ns(ns, op['o'])
- end
- when 'u'
- selector = op['o2']
- update = op['o']
- if update.keys.any? { |k| k.start_with? '$' }
- log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})")
- sync_object(ns, selector['_id'])
- else
- log.debug("upsert #{ns}: _id=#{selector['_id']}")
-
- # The update operation replaces the existing object, but
- # preserves its _id field, so grab the _id off of the
- # 'query' field -- it's not guaranteed to be present on the
- # update.
- update = { '_id' => selector['_id'] }.merge(update)
- @sql.upsert_ns(ns, update)
- end
- when 'd'
- if options[:ignore_delete]
- log.debug("Ignoring delete op on #{ns} as instructed.")
- else
- @sql.delete_ns(ns, op['o'])
- end
- else
- log.info("Skipping unknown op #{op.inspect}")
+ @streamer.optail
end
end
end
end