lib/mongo/collection/view/change_stream.rb in mongo-2.20.1 vs lib/mongo/collection/view/change_stream.rb in mongo-2.21.0
- old
+ new
@@ -13,10 +13,11 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'mongo/collection/view/aggregation/behavior'
require 'mongo/collection/view/change_stream/retryable'
module Mongo
class Collection
class View
@@ -33,11 +34,12 @@
# green thread, therefore calling #next on the change stream will cause
# getMores to be called in a loop in the background.
#
#
# @since 2.5.0
- class ChangeStream < Aggregation
+ class ChangeStream
+ include Aggregation::Behavior
include Retryable
# @return [ String ] The fullDocument option default value.
#
# @since 2.5.0
@@ -58,10 +60,14 @@
# @return [ BSON::Document ] The change stream options.
#
# @since 2.5.0
attr_reader :options
+ # @return [ Cursor ] the underlying cursor for this operation
+ # @api private
+ attr_reader :cursor
+
# Initialize the change stream for the provided collection view, pipeline
# and options.
#
# @example Create the new change stream view.
# ChangeStream.new(view, pipeline, options)
@@ -123,15 +129,17 @@
#
# The server will report an error if `startAfter` and `resumeAfter` are both specified.
#
# @since 2.5.0
def initialize(view, pipeline, changes_for, options = {})
- @view = view
- @changes_for = changes_for
- @change_stream_filters = pipeline && pipeline.dup
- @options = options && options.dup.freeze
- @start_after = @options[:start_after]
+ # change stream cursors can only be :iterable, so we don't allow
+ # timeout_mode to be specified.
+ perform_setup(view, options, forbid: %i[ timeout_mode ]) do
+ @changes_for = changes_for
+ @change_stream_filters = pipeline && pipeline.dup
+ @start_after = @options[:start_after]
+ end
# The resume token tracked by the change stream, used only
# when there is no cursor, or no cursor resume token
@resume_token = @start_after || @options[:resume_after]
@@ -179,28 +187,34 @@
# it will return nil.
#
# @return [ BSON::Document | nil ] A change stream document.
# @since 2.6.0
def try_next
+ recreate_cursor! if @timed_out
+
raise StopIteration.new if closed?
+
begin
doc = @cursor.try_next
rescue Mongo::Error => e
- if !e.change_stream_resumable?
- raise
- end
+ # "If a next call fails with a timeout error, drivers MUST NOT
+ # invalidate the change stream. The subsequent next call MUST
+ # perform a resume attempt to establish a new change stream on the
+ # server..."
+ #
+ # However, SocketTimeoutErrors are TimeoutErrors, but are also
+ # change-stream-resumable. To preserve existing (specified) behavior,
+ # We only count timeouts when the error is not also
+ # change-stream-resumable.
+ @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable?
- # Rerun initial aggregation.
- # Any errors here will stop iteration and break out of this
- # method.
+ raise unless @timed_out || e.change_stream_resumable?
- # Save cursor's resume token so we can use it
- # to create a new cursor
@resume_token = @cursor.resume_token
+ raise e if @timed_out
- close
- create_cursor!
+ recreate_cursor!(@cursor.context)
retry
end
# We need to verify each doc has an _id, so we
# have a resume token to work with
@@ -229,18 +243,21 @@
# @note This method attempts to close the cursor used by the change
# stream, which in turn closes the server-side change stream cursor.
# This method ignores any errors that occur when closing the
# server-side cursor.
#
+ # @params [ Hash ] opts Options to be passed to the cursor close
+ # command.
+ #
# @return [ nil ] Always nil.
#
# @since 2.5.0
- def close
+ def close(opts = {})
unless closed?
begin
- @cursor.close
- rescue Error::OperationFailure, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
+ @cursor.close(opts)
+ rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
# ignore
end
@cursor = nil
end
end
@@ -282,10 +299,32 @@
def resume_token
cursor_resume_token = @cursor.resume_token if @cursor
cursor_resume_token || @resume_token
end
+ # "change streams are an abstraction around tailable-awaitData cursors..."
+ #
+ # @return :tailable_await
+ def cursor_type
+ :tailable_await
+ end
+
+ # "change streams...implicitly use ITERATION mode"
+ #
+ # @return :iteration
+ def timeout_mode
+ :iteration
+ end
+
+ # Returns the value of the max_await_time_ms option that was
+ # passed to this change stream.
+ #
+ # @return [ Integer | nil ] the max_await_time_ms value
+ def max_await_time_ms
+ options[:max_await_time_ms]
+ end
+
private
def for_cluster?
@changes_for == CLUSTER
end
@@ -296,23 +335,27 @@
def for_collection?
!for_cluster? && !for_database?
end
- def create_cursor!
+ def create_cursor!(timeout_ms = nil)
# clear the cache because we may get a newer or an older server
# (rolling upgrades)
@start_at_operation_time_supported = nil
- session = client.send(:get_session, @options)
+ session = client.get_session(@options)
+ context = Operation::Context.new(client: client, session: session, view: self, operation_timeouts: timeout_ms ? { operation_timeout_ms: timeout_ms } : operation_timeouts)
+
start_at_operation_time = nil
start_at_operation_time_supported = nil
- @cursor = read_with_retry_cursor(session, server_selector, view) do |server|
+
+ @cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
server.with_connection do |connection|
start_at_operation_time_supported = connection.description.server_version_gte?('4.0')
- result = send_initial_query(connection, session)
+ result = send_initial_query(connection, context)
+
if doc = result.replies.first && result.replies.first.documents.first
start_at_operation_time = doc['operationTime']
else
# The above may set @start_at_operation_time to nil
# if it was not in the document for some reason,
@@ -322,10 +365,11 @@
start_at_operation_time = nil
end
result
end
end
+
@start_at_operation_time = start_at_operation_time
@start_at_operation_time_supported = start_at_operation_time_supported
end
def pipeline
@@ -388,15 +432,15 @@
doc[:allChangesForCluster] = true if for_cluster?
end
end
- def send_initial_query(connection, session)
- initial_query_op(session, view.read_preference)
+ def send_initial_query(connection, context)
+ initial_query_op(context.session, view.read_preference)
.execute_with_connection(
connection,
- context: Operation::Context.new(client: client, session: session),
+ context: context,
)
end
def time_to_bson_timestamp(time)
if time.is_a?(Time)
@@ -409,9 +453,18 @@
end
end
def resuming?
!!@resuming
+ end
+
+ # Recreates the current cursor (typically as a consequence of attempting
+ # to resume the change stream)
+ def recreate_cursor!(context = nil)
+ @timed_out = false
+
+ close
+ create_cursor!(context&.remaining_timeout_ms)
end
end
end
end
end