lib/mongo/collection/view/change_stream.rb in mongo-2.20.1 vs lib/mongo/collection/view/change_stream.rb in mongo-2.21.0

- old
+ new

@@ -13,10 +13,11 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/collection/view/aggregation/behavior' require 'mongo/collection/view/change_stream/retryable' module Mongo class Collection class View @@ -33,11 +34,12 @@ # green thread, therefore calling #next on the change stream will cause # getMores to be called in a loop in the background. # # # @since 2.5.0 - class ChangeStream < Aggregation + class ChangeStream + include Aggregation::Behavior include Retryable # @return [ String ] The fullDocument option default value. # # @since 2.5.0 @@ -58,10 +60,14 @@ # @return [ BSON::Document ] The change stream options. # # @since 2.5.0 attr_reader :options + # @return [ Cursor ] the underlying cursor for this operation + # @api private + attr_reader :cursor + # Initialize the change stream for the provided collection view, pipeline # and options. # # @example Create the new change stream view. # ChangeStream.new(view, pipeline, options) @@ -123,15 +129,17 @@ # # The server will report an error if `startAfter` and `resumeAfter` are both specified. # # @since 2.5.0 def initialize(view, pipeline, changes_for, options = {}) - @view = view - @changes_for = changes_for - @change_stream_filters = pipeline && pipeline.dup - @options = options && options.dup.freeze - @start_after = @options[:start_after] + # change stream cursors can only be :iterable, so we don't allow + # timeout_mode to be specified. + perform_setup(view, options, forbid: %i[ timeout_mode ]) do + @changes_for = changes_for + @change_stream_filters = pipeline && pipeline.dup + @start_after = @options[:start_after] + end # The resume token tracked by the change stream, used only # when there is no cursor, or no cursor resume token @resume_token = @start_after || @options[:resume_after] @@ -179,28 +187,34 @@ # it will return nil. # # @return [ BSON::Document | nil ] A change stream document. # @since 2.6.0 def try_next + recreate_cursor! if @timed_out + raise StopIteration.new if closed? + begin doc = @cursor.try_next rescue Mongo::Error => e - if !e.change_stream_resumable? - raise - end + # "If a next call fails with a timeout error, drivers MUST NOT + # invalidate the change stream. The subsequent next call MUST + # perform a resume attempt to establish a new change stream on the + # server..." + # + # However, SocketTimeoutErrors are TimeoutErrors, but are also + # change-stream-resumable. To preserve existing (specified) behavior, + # We only count timeouts when the error is not also + # change-stream-resumable. + @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable? - # Rerun initial aggregation. - # Any errors here will stop iteration and break out of this - # method. + raise unless @timed_out || e.change_stream_resumable? - # Save cursor's resume token so we can use it - # to create a new cursor @resume_token = @cursor.resume_token + raise e if @timed_out - close - create_cursor! + recreate_cursor!(@cursor.context) retry end # We need to verify each doc has an _id, so we # have a resume token to work with @@ -229,18 +243,21 @@ # @note This method attempts to close the cursor used by the change # stream, which in turn closes the server-side change stream cursor. # This method ignores any errors that occur when closing the # server-side cursor. # + # @params [ Hash ] opts Options to be passed to the cursor close + # command. + # # @return [ nil ] Always nil. # # @since 2.5.0 - def close + def close(opts = {}) unless closed? begin - @cursor.close - rescue Error::OperationFailure, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection + @cursor.close(opts) + rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection # ignore end @cursor = nil end end @@ -282,10 +299,32 @@ def resume_token cursor_resume_token = @cursor.resume_token if @cursor cursor_resume_token || @resume_token end + # "change streams are an abstraction around tailable-awaitData cursors..." + # + # @return :tailable_await + def cursor_type + :tailable_await + end + + # "change streams...implicitly use ITERATION mode" + # + # @return :iteration + def timeout_mode + :iteration + end + + # Returns the value of the max_await_time_ms option that was + # passed to this change stream. + # + # @return [ Integer | nil ] the max_await_time_ms value + def max_await_time_ms + options[:max_await_time_ms] + end + private def for_cluster? @changes_for == CLUSTER end @@ -296,23 +335,27 @@ def for_collection? !for_cluster? && !for_database? end - def create_cursor! + def create_cursor!(timeout_ms = nil) # clear the cache because we may get a newer or an older server # (rolling upgrades) @start_at_operation_time_supported = nil - session = client.send(:get_session, @options) + session = client.get_session(@options) + context = Operation::Context.new(client: client, session: session, view: self, operation_timeouts: timeout_ms ? { operation_timeout_ms: timeout_ms } : operation_timeouts) + start_at_operation_time = nil start_at_operation_time_supported = nil - @cursor = read_with_retry_cursor(session, server_selector, view) do |server| + + @cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server| server.with_connection do |connection| start_at_operation_time_supported = connection.description.server_version_gte?('4.0') - result = send_initial_query(connection, session) + result = send_initial_query(connection, context) + if doc = result.replies.first && result.replies.first.documents.first start_at_operation_time = doc['operationTime'] else # The above may set @start_at_operation_time to nil # if it was not in the document for some reason, @@ -322,10 +365,11 @@ start_at_operation_time = nil end result end end + @start_at_operation_time = start_at_operation_time @start_at_operation_time_supported = start_at_operation_time_supported end def pipeline @@ -388,15 +432,15 @@ doc[:allChangesForCluster] = true if for_cluster? end end - def send_initial_query(connection, session) - initial_query_op(session, view.read_preference) + def send_initial_query(connection, context) + initial_query_op(context.session, view.read_preference) .execute_with_connection( connection, - context: Operation::Context.new(client: client, session: session), + context: context, ) end def time_to_bson_timestamp(time) if time.is_a?(Time) @@ -409,9 +453,18 @@ end end def resuming? !!@resuming + end + + # Recreates the current cursor (typically as a consequence of attempting + # to resume the change stream) + def recreate_cursor!(context = nil) + @timed_out = false + + close + create_cursor!(context&.remaining_timeout_ms) end end end end end