lib/mosql/streamer.rb in mosql-0.3.2 vs lib/mosql/streamer.rb in mosql-0.4.0

- old
+ new

@@ -22,11 +22,11 @@ def stop @done = true end def import - if options[:reimport] || tailer.read_timestamp.seconds == 0 + if options[:reimport] || tailer.read_position.nil? initial_import end end def collection_for_ns(ns) @@ -88,18 +88,21 @@ def initial_import @schema.create_schema(@sql.db, !options[:no_drop_tables]) unless options[:skip_tail] - start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts'] + start_state = { + 'time' => nil, + 'position' => @tailer.most_recent_position + } end dbnames = [] - if dbname = options[:dbname] - log.info "Skipping DB scan and using db: #{dbname}" - dbnames = [ dbname ] + if options[:dbname] + log.info "Skipping DB scan and using db: #{options[:dbname]}" + dbnames = [ options[:dbname] ] else dbnames = @mongo.database_names end dbnames.each do |dbname| @@ -119,12 +122,11 @@ import_collection(ns, collection, spec[collection.name][:meta][:filter]) exit(0) if @done end end - - tailer.write_timestamp(start_ts) unless options[:skip_tail] + tailer.save_state(start_state) unless options[:skip_tail] end def did_truncate; @did_truncate ||= {}; end def import_collection(ns, collection, filter) @@ -162,40 +164,56 @@ bulk_upsert(table, ns, batch) end end def optail - tailer.tail_from(options[:tail_from] ? - BSON::Timestamp.new(options[:tail_from].to_i, 0) : - nil) + tail_from = options[:tail_from] + if tail_from.is_a? Time + tail_from = tailer.most_recent_position(tail_from) + end + tailer.tail(:from => tail_from) until @done tailer.stream(1000) do |op| handle_op(op) end end end - def sync_object(ns, _id) - primary_sql_key = @schema.primary_sql_key_for_ns(ns) - sqlid = @sql.transform_one_ns(ns, { '_id' => _id })[primary_sql_key] - obj = collection_for_ns(ns).find_one({:_id => _id}) + def sync_object(ns, selector) + obj = collection_for_ns(ns).find_one(selector) if obj unsafe_handle_exceptions(ns, obj) do @sql.upsert_ns(ns, obj) end else - @sql.table_for_ns(ns).where(primary_sql_key.to_sym => sqlid).delete() + primary_sql_keys = @schema.primary_sql_key_for_ns(ns) + schema = @schema.find_ns!(ns) + query = {} + primary_sql_keys.each do |key| + source = schema[:columns].find {|c| c[:name] == key }[:source] + query[key] = selector[source] + end + @sql.table_for_ns(ns).where(query).delete() end end def handle_op(op) log.debug("processing op: #{op.inspect}") unless op['ns'] && op['op'] log.warn("Weird op: #{op.inspect}") return end + # First, check if this was an operation performed via applyOps. If so, call handle_op with + # for each op that was applied. + # The oplog format of applyOps commands can be viewed here: + # https://groups.google.com/forum/#!topic/mongodb-user/dTf5VEJJWvY + if op['op'] == 'c' && (ops = op['o']['applyOps']) + ops.each { |op| handle_op(op) } + return + end + unless @schema.find_ns(op['ns']) log.debug("Skipping op for unknown ns #{op['ns']}...") return end @@ -216,18 +234,27 @@ when 'u' selector = op['o2'] update = op['o'] if update.keys.any? { |k| k.start_with? '$' } log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})") - sync_object(ns, selector['_id']) + sync_object(ns, selector) else - log.debug("upsert #{ns}: _id=#{selector['_id']}") # The update operation replaces the existing object, but # preserves its _id field, so grab the _id off of the # 'query' field -- it's not guaranteed to be present on the # update. - update = { '_id' => selector['_id'] }.merge(update) + primary_sql_keys = @schema.primary_sql_key_for_ns(ns) + schema = @schema.find_ns!(ns) + keys = {} + primary_sql_keys.each do |key| + source = schema[:columns].find {|c| c[:name] == key }[:source] + keys[key] = selector[source] + end + + log.debug("upsert #{ns}: #{keys}") + + update = keys.merge(update) unsafe_handle_exceptions(ns, update) do @sql.upsert_ns(ns, update) end end when 'd'