lib/mongo/collection/view/change_stream.rb in mongo-2.5.3 vs lib/mongo/collection/view/change_stream.rb in mongo-2.6.0
- old
+ new
@@ -1,6 +1,6 @@
-# Copyright (C) 2017 MongoDB, Inc.
+# Copyright (C) 2017-2018 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -17,18 +17,20 @@
module Mongo
class Collection
class View
# Provides behaviour around a `$changeStream` pipeline stage in the
- # aggregation framework. Specifying this stage allows users to request that
- # notifications are sent for all changes to a particular collection or database.
+ # aggregation framework. Specifying this stage allows users to request
+ # that notifications are sent for all changes to a particular collection
+ # or database.
#
# @note Only available in server versions 3.6 and higher.
- # @note ChangeStreams do not work properly with JRuby because of the issue documented
- # here: https://github.com/jruby/jruby/issues/4212
- # Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread.
- # So calling #next on the change stream will cause getmores to be called in a loop in the background.
+ # @note ChangeStreams do not work properly with JRuby because of the
+ # issue documented here: https://github.com/jruby/jruby/issues/4212.
+ # Namely, JRuby eagerly evaluates #next on an Enumerator in a background
+ # green thread, therefore calling #next on the change stream will cause
+ # getMores to be called in a loop in the background.
#
#
# @since 2.5.0
class ChangeStream < Aggregation
include Retryable
@@ -36,10 +38,22 @@
# @return [ String ] The fullDocument option default value.
#
# @since 2.5.0
FULL_DOCUMENT_DEFAULT = 'default'.freeze
+ # @return [ Symbol ] Used to indicate that the change stream should listen for changes on
+ # the entire database rather than just the collection.
+ #
+ # @since 2.6.0
+ DATABASE = :database
+
+ # @return [ Symbol ] Used to indicate that the change stream should listen for changes on
+ # the entire cluster rather than just the collection.
+ #
+ # @since 2.6.0
+ CLUSTER = :cluster
+
# @return [ BSON::Document ] The change stream options.
#
# @since 2.5.0
attr_reader :options
@@ -51,32 +65,45 @@
#
# @param [ Collection::View ] view The collection view.
# @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications.
# @param [ Hash ] options The change stream options.
#
- # @option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.
- # When set to ‘updateLookup’, the change notification for partial updates will include both a delta
+ # @option options [ String ] :full_document Allowed values: 'default', 'updateLookup'. Defaults to 'default'.
+ # When set to 'updateLookup', the change notification for partial updates will include both a delta
# describing the changes to the document, as well as a copy of the entire document that was changed
# from some time after the change occurred.
# @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
# new change stream.
# @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
# on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
+ # @option options [ BSON::Timestamp ] :start_at_operation_time Only
+ # return changes that occurred at or after the specified timestamp. Any
+ # command run against the server will return a cluster time that can
+ # be used here. Only recognized by server versions 4.0+.
#
# @since 2.5.0
- def initialize(view, pipeline, options = {})
+ def initialize(view, pipeline, changes_for, options = {})
@view = view
+ @changes_for = changes_for
@change_stream_filters = pipeline && pipeline.dup
@options = options && options.dup.freeze
@resume_token = @options[:resume_after]
- read_with_one_retry { create_cursor! }
+ create_cursor!
+
+ # We send different parameters when we resume a change stream
+ # compared to when we send the first query
+ @resuming = true
end
# Iterate through documents returned by the change stream.
#
+ # This method retries once per error on resumable errors
+ # (two consecutive errors result in the second error being raised,
+ # an error which is recovered from resets the error count to zero).
+ #
# @example Iterate through the stream of documents.
# stream.each do |document|
# p document
# end
#
@@ -85,26 +112,88 @@
# @since 2.5.0
#
# @yieldparam [ BSON::Document ] Each change stream document.
def each
raise StopIteration.new if closed?
+ retried = false
begin
@cursor.each do |doc|
cache_resume_token(doc)
yield doc
end if block_given?
@cursor.to_enum
- rescue => e
+ rescue Mongo::Error => e
+ if retried || !e.change_stream_resumable?
+ raise
+ end
+
+ retried = true
+ # Rerun initial aggregation.
+ # Any errors here will stop iteration and break out of this
+ # method
close
- if retryable?(e)
+ create_cursor!
+ retry
+ end
+ end
+
+ # Return one document from the change stream, if one is available.
+ #
+ # Retries once on a resumable error.
+ #
+ # Raises StopIteration if the change stream is closed.
+ #
+ # This method will wait up to max_await_time_ms milliseconds
+ # for changes from the server, and if no changes are received
+ # it will return nil.
+ #
+ # @note This method is experimental and subject to change.
+ #
+ # @return [ BSON::Document | nil ] A change stream document.
+ # @api private
+ def try_next
+ raise StopIteration.new if closed?
+ retried = false
+
+ begin
+ doc = @cursor.try_next
+ rescue Mongo::Error => e
+ unless e.change_stream_resumable?
+ raise
+ end
+
+ if retried
+ # Rerun initial aggregation.
+ # Any errors here will stop iteration and break out of this
+ # method
+ close
create_cursor!
+ retried = false
+ else
+ # Attempt to retry a getMore once
+ retried = true
retry
end
- raise
end
+
+ if doc
+ cache_resume_token(doc)
+ end
+ doc
end
+ def to_enum
+ enum = super
+ enum.send(:instance_variable_set, '@obj', self)
+ class << enum
+ def try_next
+ @obj.try_next
+ end
+ end
+ enum
+ end
+
# Close the change stream.
#
# @example Close the change stream.
# stream.close
#
@@ -143,30 +232,117 @@
"options=#{@options} resume_token=#{@resume_token}>"
end
private
+ def for_cluster?
+ @changes_for == CLUSTER
+ end
+
+ def for_database?
+ @changes_for == DATABASE
+ end
+
+ def for_collection?
+ !for_cluster? && !for_database?
+ end
+
def cache_resume_token(doc)
+ # Always record both resume token and operation time,
+ # in case we get an older or newer server during rolling
+ # upgrades/downgrades
unless @resume_token = (doc[:_id] && doc[:_id].dup)
- raise Error::MissingResumeToken.new
+ raise Error::MissingResumeToken
end
end
def create_cursor!
+ # clear the cache because we may get a newer or an older server
+ # (rolling upgrades)
+ @start_at_operation_time_supported = nil
+
session = client.send(:get_session, @options)
server = server_selector.select_server(cluster)
result = send_initial_query(server, session)
+ if doc = result.replies.first && result.replies.first.documents.first
+ @start_at_operation_time = doc['operationTime']
+ else
+ # The above may set @start_at_operation_time to nil
+ # if it was not in the document for some reason,
+ # for consistency set it to nil here as well
+ @start_at_operation_time = nil
+ end
@cursor = Cursor.new(view, result, server, disable_retry: true, session: session)
end
def pipeline
- change_doc = { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }
- change_doc[:resumeAfter] = @resume_token if @resume_token
[{ '$changeStream' => change_doc }] + @change_stream_filters
end
+ def aggregate_spec(session)
+ super(session).tap do |spec|
+ spec[:selector][:aggregate] = 1 unless for_collection?
+ end
+ end
+
+ def change_doc
+ { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }.tap do |doc|
+ if resuming?
+ # We have a resume token once we retrieved any documents.
+ # However, if the first getMore fails and the user didn't pass
+ # a resume token we won't have a resume token to use.
+ # Use start_at_operation time in this case
+ if @resume_token
+ # Spec says we need to remove startAtOperationTime if
+ # one was passed in by user, thus we won't forward it
+ elsif start_at_operation_time_supported? && @start_at_operation_time
+ # It is crucial to check @start_at_operation_time_supported
+ # here - we may have switched to an older server that
+ # does not support operation times and therefore shouldn't
+ # try to send one to it!
+ #
+ # @start_at_operation_time is already a BSON::Timestamp
+ doc[:startAtOperationTime] = @start_at_operation_time
+ else
+ # Can't resume if we don't have either
+ raise Mongo::Error::MissingResumeToken
+ end
+ else
+ if options[:start_at_operation_time]
+ doc[:startAtOperationTime] = time_to_bson_timestamp(
+ options[:start_at_operation_time])
+ end
+ end
+ doc[:resumeAfter] = @resume_token if @resume_token
+ doc[:allChangesForCluster] = true if for_cluster?
+ end
+ end
+
def send_initial_query(server, session)
initial_query_op(session).execute(server)
+ end
+
+ def time_to_bson_timestamp(time)
+ if time.is_a?(Time)
+ seconds = time.to_f
+ BSON::Timestamp.new(seconds.to_i, ((seconds - seconds.to_i) * 1000000).to_i)
+ elsif time.is_a?(BSON::Timestamp)
+ time
+ else
+ raise ArgumentError, 'Time must be a Time or a BSON::Timestamp instance'
+ end
+ end
+
+ def resuming?
+ !!@resuming
+ end
+
+ def start_at_operation_time_supported?
+ if @start_at_operation_time_supported.nil?
+ server = server_selector.select_server(cluster)
+ @start_at_operation_time_supported = server.description.max_wire_version >= 7
+ end
+ @start_at_operation_time_supported
end
end
end
end
end