lib/mosql/streamer.rb in mosql-0.3.2 vs lib/mosql/streamer.rb in mosql-0.4.0
- old
+ new
@@ -22,11 +22,11 @@
def stop
@done = true
end
def import
- if options[:reimport] || tailer.read_timestamp.seconds == 0
+ if options[:reimport] || tailer.read_position.nil?
initial_import
end
end
def collection_for_ns(ns)
@@ -88,18 +88,21 @@
def initial_import
@schema.create_schema(@sql.db, !options[:no_drop_tables])
unless options[:skip_tail]
- start_ts = @mongo['local']['oplog.rs'].find_one({}, {:sort => [['$natural', -1]]})['ts']
+ start_state = {
+ 'time' => nil,
+ 'position' => @tailer.most_recent_position
+ }
end
dbnames = []
- if dbname = options[:dbname]
- log.info "Skipping DB scan and using db: #{dbname}"
- dbnames = [ dbname ]
+ if options[:dbname]
+ log.info "Skipping DB scan and using db: #{options[:dbname]}"
+ dbnames = [ options[:dbname] ]
else
dbnames = @mongo.database_names
end
dbnames.each do |dbname|
@@ -119,12 +122,11 @@
import_collection(ns, collection, spec[collection.name][:meta][:filter])
exit(0) if @done
end
end
-
- tailer.write_timestamp(start_ts) unless options[:skip_tail]
+ tailer.save_state(start_state) unless options[:skip_tail]
end
def did_truncate; @did_truncate ||= {}; end
def import_collection(ns, collection, filter)
@@ -162,40 +164,56 @@
bulk_upsert(table, ns, batch)
end
end
def optail
- tailer.tail_from(options[:tail_from] ?
- BSON::Timestamp.new(options[:tail_from].to_i, 0) :
- nil)
+ tail_from = options[:tail_from]
+ if tail_from.is_a? Time
+ tail_from = tailer.most_recent_position(tail_from)
+ end
+ tailer.tail(:from => tail_from)
until @done
tailer.stream(1000) do |op|
handle_op(op)
end
end
end
- def sync_object(ns, _id)
- primary_sql_key = @schema.primary_sql_key_for_ns(ns)
- sqlid = @sql.transform_one_ns(ns, { '_id' => _id })[primary_sql_key]
- obj = collection_for_ns(ns).find_one({:_id => _id})
+ def sync_object(ns, selector)
+ obj = collection_for_ns(ns).find_one(selector)
if obj
unsafe_handle_exceptions(ns, obj) do
@sql.upsert_ns(ns, obj)
end
else
- @sql.table_for_ns(ns).where(primary_sql_key.to_sym => sqlid).delete()
+ primary_sql_keys = @schema.primary_sql_key_for_ns(ns)
+ schema = @schema.find_ns!(ns)
+ query = {}
+ primary_sql_keys.each do |key|
+ source = schema[:columns].find {|c| c[:name] == key }[:source]
+ query[key] = selector[source]
+ end
+ @sql.table_for_ns(ns).where(query).delete()
end
end
def handle_op(op)
log.debug("processing op: #{op.inspect}")
unless op['ns'] && op['op']
log.warn("Weird op: #{op.inspect}")
return
end
+ # First, check if this was an operation performed via applyOps. If so, call handle_op with
+ # for each op that was applied.
+ # The oplog format of applyOps commands can be viewed here:
+ # https://groups.google.com/forum/#!topic/mongodb-user/dTf5VEJJWvY
+ if op['op'] == 'c' && (ops = op['o']['applyOps'])
+ ops.each { |op| handle_op(op) }
+ return
+ end
+
unless @schema.find_ns(op['ns'])
log.debug("Skipping op for unknown ns #{op['ns']}...")
return
end
@@ -216,18 +234,27 @@
when 'u'
selector = op['o2']
update = op['o']
if update.keys.any? { |k| k.start_with? '$' }
log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})")
- sync_object(ns, selector['_id'])
+ sync_object(ns, selector)
else
- log.debug("upsert #{ns}: _id=#{selector['_id']}")
# The update operation replaces the existing object, but
# preserves its _id field, so grab the _id off of the
# 'query' field -- it's not guaranteed to be present on the
# update.
- update = { '_id' => selector['_id'] }.merge(update)
+ primary_sql_keys = @schema.primary_sql_key_for_ns(ns)
+ schema = @schema.find_ns!(ns)
+ keys = {}
+ primary_sql_keys.each do |key|
+ source = schema[:columns].find {|c| c[:name] == key }[:source]
+ keys[key] = selector[source]
+ end
+
+ log.debug("upsert #{ns}: #{keys}")
+
+ update = keys.merge(update)
unsafe_handle_exceptions(ns, update) do
@sql.upsert_ns(ns, update)
end
end
when 'd'