module MoSQL class Streamer include MoSQL::Logging BATCH = 1000 attr_reader :options, :tailer NEW_KEYS = [:options, :tailer, :mongo, :sql, :schema] def initialize(opts) NEW_KEYS.each do |parm| unless opts.key?(parm) raise ArgumentError.new("Required argument `#{parm}' not provided to #{self.class.name}#new.") end instance_variable_set(:"@#{parm.to_s}", opts[parm]) end @done = false end def stop @done = true end def import if options[:reimport] || tailer.read_timestamp.seconds == 0 initial_import end end def collection_for_ns(ns) dbname, collection = ns.split(".", 2) @mongo.db(dbname).collection(collection) end def unsafe_handle_exceptions(ns, obj) begin yield rescue Sequel::DatabaseError => e wrapped = e.wrapped_exception if wrapped.result && options[:unsafe] log.warn("Ignoring row (#{obj.inspect}): #{e}") else log.error("Error processing #{obj.inspect} for #{ns}.") raise e end end end def bulk_upsert(table, ns, items) begin @schema.copy_data(table.db, ns, items) rescue Sequel::DatabaseError => e log.debug("Bulk insert error (#{e}), attempting invidual upserts...") cols = @schema.all_columns(@schema.find_ns(ns)) items.each do |it| h = {} cols.zip(it).each { |k,v| h[k] = v } unsafe_handle_exceptions(ns, h) do @sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h) end end end end def with_retries(tries=10) tries.times do |try| begin yield rescue Mongo::ConnectionError, Mongo::ConnectionFailure, Mongo::OperationFailure => e # Duplicate key error raise if e.kind_of?(Mongo::OperationFailure) && [11000, 11001].include?(e.error_code) # Cursor timeout raise if e.kind_of?(Mongo::OperationFailure) && e.message =~ /^Query response returned CURSOR_NOT_FOUND/ delay = 0.5 * (1.5 ** try) log.warn("Mongo exception: #{e}, sleeping #{delay}s...") sleep(delay) end end end def track_time start = Time.now yield Time.now - start end def initial_import @schema.create_schema(@sql.db, !options[:no_drop_tables]) unless options[:skip_tail] start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts'] end @mongo.database_names.each do |dbname| next unless spec = @schema.find_db(dbname) log.info("Importing for Mongo DB #{dbname}...") db = @mongo.db(dbname) db.collections.select { |c| spec.key?(c.name) }.each do |collection| ns = "#{dbname}.#{collection.name}" import_collection(ns, collection) exit(0) if @done end end tailer.write_timestamp(start_ts) unless options[:skip_tail] end def did_truncate; @did_truncate ||= {}; end def import_collection(ns, collection) log.info("Importing for #{ns}...") count = 0 batch = [] table = @sql.table_for_ns(ns) unless options[:no_drop_tables] || did_truncate[table.first_source] table.truncate did_truncate[table.first_source] = true end start = Time.now sql_time = 0 collection.find(nil, :batch_size => BATCH) do |cursor| with_retries do cursor.each do |obj| batch << @schema.transform(ns, obj) count += 1 if batch.length >= BATCH sql_time += track_time do bulk_upsert(table, ns, batch) end elapsed = Time.now - start log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") batch.clear exit(0) if @done end end end end unless batch.empty? bulk_upsert(table, ns, batch) end end def optail tailer.tail_from(options[:tail_from] ? BSON::Timestamp.new(options[:tail_from].to_i, 0) : nil) until @done tailer.stream(1000) do |op| handle_op(op) end end end def sync_object(ns, _id) primary_sql_key = @schema.primary_sql_key_for_ns(ns) sqlid = @sql.transform_one_ns(ns, { '_id' => _id })[primary_sql_key] obj = collection_for_ns(ns).find_one({:_id => _id}) if obj unsafe_handle_exceptions(ns, obj) do @sql.upsert_ns(ns, obj) end else @sql.table_for_ns(ns).where(primary_sql_key.to_sym => sqlid).delete() end end def handle_op(op) log.debug("processing op: #{op.inspect}") unless op['ns'] && op['op'] log.warn("Weird op: #{op.inspect}") return end unless @schema.find_ns(op['ns']) log.debug("Skipping op for unknown ns #{op['ns']}...") return end ns = op['ns'] dbname, collection_name = ns.split(".", 2) case op['op'] when 'n' log.debug("Skipping no-op #{op.inspect}") when 'i' if collection_name == 'system.indexes' log.info("Skipping index update: #{op.inspect}") else unsafe_handle_exceptions(ns, op['o']) do @sql.upsert_ns(ns, op['o']) end end when 'u' selector = op['o2'] update = op['o'] if update.keys.any? { |k| k.start_with? '$' } log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})") sync_object(ns, selector['_id']) else log.debug("upsert #{ns}: _id=#{selector['_id']}") # The update operation replaces the existing object, but # preserves its _id field, so grab the _id off of the # 'query' field -- it's not guaranteed to be present on the # update. update = { '_id' => selector['_id'] }.merge(update) unsafe_handle_exceptions(ns, update) do @sql.upsert_ns(ns, update) end end when 'd' if options[:ignore_delete] log.debug("Ignoring delete op on #{ns} as instructed.") else @sql.delete_ns(ns, op['o']) end else log.info("Skipping unknown op #{op.inspect}") end end end end