o SbD@sVdZddlZddlmZmZmZmZmZmZm Z ddl m Z ddl m Z ddlmZddlmZmZddlmZmZdd lmZdd lmZdd lmZmZmZmZmZdd l m!Z!m"Z"m#Z#e$gd Z%er~ddl&m'Z'ddl(m)Z)ddl*m+Z+ddl,m-Z-Gdddee"Z.Gddde.ee"Z/Gddde.ee"Z0Gddde0ee"Z1dS)zAWatch changes on a collection, a database, or the entire cluster.N) TYPE_CHECKINGAnyDictGenericMappingOptionalUnion) _bson_to_dict)RawBSONDocument) Timestamp)_csotcommon)_CollectionAggregationCommand_DatabaseAggregationCommand)validate_collation_or_none) CommandCursor)ConnectionFailureCursorNotFoundInvalidOperationOperationFailure PyMongoError) _CollationIn _DocumentType _Pipeline)Y[ii)#i{'iP-iR-i{4i|4?iL4) ClientSession) Collection)Database) MongoClientc@s|eZdZdZ  d:deddeedeedeeee fdee d ee d ee d ee d ed deeee fdee deeddfddZ eddZeddZddZddZddZddZd d!Zd"d#Zd$d%Zd;d&d'Zd ChangeStreamaThe internal abstract base class for change stream cursors. Should not be called directly by application developers. Use :meth:`pymongo.collection.Collection.watch`, :meth:`pymongo.database.Database.watch`, or :meth:`pymongo.mongo_client.MongoClient.watch` instead. .. versionadded:: 3.6 .. seealso:: The MongoDB documentation on `changeStreams `_. Ntarget)zMongoClient[_DocumentType]zDatabase[_DocumentType]zCollection[_DocumentType]pipeline full_document resume_aftermax_await_time_ms batch_size collationstart_at_operation_timesessionr# start_aftercommentfull_document_before_changereturnc Cs|durg}td|}td|t|td|d|_|j|_|jjj r8d|_|j |jj t dd|_ n||_ t ||_||_| |_| du|_|du|_t | pV||_||_||_||_||_| |_| |_d|_|j j|_||_dS)Nr)r* batchSizeFT)Zdocument_class) codec_options)r Z validate_listZvalidate_string_or_nonerZ%validate_non_negative_integer_or_none_decode_customr6_orig_codec_optionsZ type_registryZ _decoder_mapZ with_optionsr _targetcopydeepcopy _pipeline_full_document_full_document_before_change_uses_start_after_uses_resume_after _resume_token_max_await_time_ms _batch_sizeZ _collation_start_at_operation_time_session_comment_closed_timeout_create_cursor_cursor) selfr(r)r*r+r,r-r.r/r0r1r2r3rL resume_tokenr?rD)rKoptionsrUrLrLrM_change_stream_optionss       z#ChangeStream._change_stream_optionscCs0i}|jdur |j|d<|jdur|j|d<|S)z4Return the options dict for the aggregation command.NZmaxAwaitTimeMSr5)rBrCrKrVrLrLrM_command_optionss     zChangeStream._command_optionscCs"|}d|ig}||j|S)z;Return the full aggregation pipeline for this ChangeStream.z $changeStream)rWextendr<)rKrVZ full_pipelinerLrLrM_aggregation_pipelines  z"ChangeStream._aggregation_pipelinecCs|dds;d|dvr|dd|_dS|jdur=|jdur?|jdurA|jdkrC|d|_|jdurEtd|fdSdSdSdSdSdS) aMCallback that caches the postBatchResumeToken or startAtOperationTime from a changeStream aggregate command response containing an empty batch of change documents. This is implemented as a callback because we need access to the wire version in order to determine whether to cache this value. cursorZ firstBatchZpostBatchResumeTokenNFrZ operationTimezAExpected field 'operationTime' missing from command response : %r)rArDr@r?Zmax_wire_versiongetr)rKresultZ sock_inforLrLrM_process_results*    zChangeStream._process_resultc Cs@|j|jt||||j|jd}|j|j |j ||S)ztRun the full aggregation pipeline for this ChangeStream and return the corresponding CommandCursor. )Zresult_processorr2) rSr9rr[rYr_rFrTZ_retryable_readZ get_cursorZ_read_preference_for)rKr0explicit_sessioncmdrLrLrM_run_aggregation_cmds z!ChangeStream._run_aggregation_cmdcCsJ|jj|jdd}|j||jdudWdS1swYdS)NFclose)r0r`)rTZ _tmp_sessionrErb)rKsrLrLrMrIs$zChangeStream._create_cursorcCs0z|jWn tyYnw||_dS)z7Reestablish this change stream after a resumable error.N)rJrdrrIrRrLrLrM_resumes  zChangeStream._resumecCsd|_|jdS)zClose this ChangeStream.TN)rGrJrdrRrLrLrMrdszChangeStream.closeChangeStream[_DocumentType]cC|SNrLrRrLrLrM__iter__zChangeStream.__iter__cCs t|jS)zThe cached resume token that will be used to resume after the most recently returned change. .. versionadded:: 3.9 )r:r;rArRrLrLrMrUs zChangeStream.resume_tokencCs$|jr|}|dur |S|jst)aAdvance the cursor. This method blocks until the next change document is returned or an unrecoverable error is raised. This method is used when iterating over all changes in the cursor. For example:: try: resume_token = None pipeline = [{'$match': {'operationType': 'insert'}}] with db.collection.watch(pipeline) as stream: for insert_change in stream: print(insert_change) resume_token = stream.resume_token except pymongo.errors.PyMongoError: # The ChangeStream encountered an unrecoverable error or the # resume attempt failed to recreate the cursor. if resume_token is None: # There is no usable resume token because there was a # failure during ChangeStream initialization. logging.error('...') else: # Use the interrupted ChangeStream's resume token to create # a new ChangeStream. The new stream will continue from the # last seen insert change without missing any events. with db.collection.watch( pipeline, resume_after=resume_token) as stream: for insert_change in stream: print(insert_change) Raises :exc:`StopIteration` if this ChangeStream is closed. N)alivetry_next StopIteration)rKdocrLrLrMnexts !zChangeStream.nextcCs|j S)zDoes this cursor have the potential to return more data? .. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise :exc:`StopIteration` and :meth:`try_next` can return ``None``. .. versionadded:: 3.8 )rGrRrLrLrMrl(s zChangeStream.alivec Csn|js |jjs |z|jd}WnMttfy(||jd}Yn9ty`}z-|jdur5|jdko>| dpH|jdkoH|j t v}|sL||jd}WYd}~nd}~ww|jjshd|_|dur||jj durz|jj |_ d|_|Sz|d}Wnty|tdw|js|jj r|jj }d|_d|_||_ d|_|jrt|j|jS|S)aAdvance the cursor without blocking indefinitely. This method returns the next change document without waiting indefinitely for the next change. For example:: with db.collection.watch() as stream: while stream.alive: change = stream.try_next() # Note that the ChangeStream's resume token may be updated # even when no changes are returned. print("Current resume token: %r" % (stream.resume_token,)) if change is not None: print("Change document: %r" % (change,)) continue # We end up here when there are no recent changes. # Sleep for a while before trying again to avoid flooding # the server with getMore requests when no changes are # available. time.sleep(10) If no change document is cached locally then this method runs a single getMore command. If the getMore yields any documents, the next document is returned, otherwise, if the getMore returns no documents (because there have been no changes) then ``None`` is returned. :Returns: The next change document or ``None`` when no document is available after running a single getMore or when the cursor is closed. .. versionadded:: 3.8 TFN ZResumableChangeStreamErrorZ_idzECannot provide resume functionality when the resume token is missing.)rGrJrlrfZ _try_nextrrrZ_max_wire_versionZhas_error_labelcode_RESUMABLE_GETMORE_ERRORSZ_post_batch_resume_tokenrArDKeyErrorrdrZ _has_nextr?r@r7r rawr8)rKZchangeexcZ is_resumablerUrLrLrMrm3sX!      zChangeStream.try_nextcCrhrirLrRrLrLrM __enter__rkzChangeStream.__enter__exc_typeexc_valexc_tbcCs |dSrirc)rKrxryrzrLrLrM__exit__s zChangeStream.__exit__)NN)r4N)r4rg)r4r')%__name__ __module__ __qualname____doc__rrrstrrrintrr rNpropertyrSrTrWrYr[r_rbrIrfrdrjrUr applyrrp__next__boolrlrmrwr{rLrLrLrMr'Gsr      5     '  ^r'c@(eZdZdZeddZeddZdS)CollectionChangeStreamzA change stream that watches changes on a single collection. Should not be called directly by application developers. Use helper method :meth:`pymongo.collection.Collection.watch` instead. .. versionadded:: 3.7 cCtSri)rrRrLrLrMrSz1CollectionChangeStream._aggregation_command_classcCs |jjjSri)r9ZdatabaseclientrRrLrLrMrTs zCollectionChangeStream._clientNr|r}r~rrrSrTrLrLrLrMr  rc@r)DatabaseChangeStreamzA change stream that watches changes on all collections in a database. Should not be called directly by application developers. Use helper method :meth:`pymongo.database.Database.watch` instead. .. versionadded:: 3.7 cCrri)rrRrLrLrMrSrz/DatabaseChangeStream._aggregation_command_classcCs|jjSri)r9rrRrLrLrMrTszDatabaseChangeStream._clientNrrLrLrLrMrrrcs eZdZdZfddZZS)ClusterChangeStreamzA change stream that watches changes on all collections in the cluster. Should not be called directly by application developers. Use helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead. .. versionadded:: 3.7 cstt|}d|d<|S)NTZallChangesForCluster)superrrWrX __class__rLrMrWsz*ClusterChangeStream._change_stream_options)r|r}r~rrW __classcell__rLrLrrMrsr)2rr:typingrrrrrrrZbsonr Z bson.raw_bsonr Zbson.timestampr Zpymongor r Zpymongo.aggregationrrZpymongo.collationrZpymongo.command_cursorrZpymongo.errorsrrrrrZpymongo.typingsrrr frozensetrsZpymongo.client_sessionr#Zpymongo.collectionr$Zpymongo.databaser%Zpymongo.mongo_clientr&r'rrrrLrLrLrMs4$         T