o Sb5@sLdZddlZddlZddlZddlZddlZddlZddlZddlm Z ddl m Z m Z m Z mZddlmZddlmZmZmZmZmZmZmZmZmZddlmZddlmZdd lm Z dd l!m"Z"dd l#m$Z$dd l%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+dd l,m-Z-m.Z.m/Z/m0Z0m1Z1ddZ2Gddde3Z4Gddde3Z5ddZ6ddZ7dS)zZ!d?d@Z"dAdBZ#dCdDZ$dEdFZ%dGdHZ&dIdJZ'dKdLZ(dMdNZ)dOdPZ*dQdRZ+dSdTZ,dUdVZ-dWdXZ.dYdZZ/d[d\Z0d]d^Z1dS)cTopologyz*Monitor a topology of one or more servers.cs|j|_|jj|_|jdu}|o|jj|_|o|jj|_d|_d|_ |js(|jr/t j dd|_|jrE|jdus9J|j |jj |jff||_t|||jdd|}||_|jr}|jduscJttjiddd|j}|j |jj||j|jff|jD]}|jr|jdusJ|j |jj||jffqt||_d|_d|_t|_ |j!|j |_"i|_#d|_$d|_%t&|_'|js|jrfdd}t(j)t*j+t*j,|dd}t-.|j|j/||_ |0d|_1|jj2dur|jj3st4||j|_1dSdSdS)Nd)maxsizeFcstSN)r*r(weakr(r)targetsz!Topology.__init__..targetZpymongo_events_thread)intervalZ min_intervalr1name)5 _topology_idZ _pool_options_event_listeners _listenersZenabled_for_server_publish_serverZenabled_for_topology _publish_tp_events_Topology__events_executorr"QueueputZpublish_topology_opened _settingsrZget_topology_typeZget_server_descriptionsreplica_set_name _descriptionrUnknown$publish_topology_description_changedseedsZpublish_server_openedlistserver_descriptions_seed_addresses_opened_closed threadingLock_lockZcondition_class _condition_servers_pid_max_cluster_timer _session_poolrZPeriodicExecutorrZEVENTS_QUEUE_FREQUENCYMIN_HEARTBEAT_INTERVALweakrefrefcloseopen _srv_monitorfqdn load_balancedr)selftopology_settingsZpubZtopology_descriptionZ initial_tdseedr1executorr(r/r)__init__Qsz          zTopology.__init__cCst}|jdur ||_n1||jkr>||_td|j|jD]}|q#|j Wdn1s9wY|j | WddS1sQwYdS)aStart monitoring, or restart after a fork. No effect if called multiple times. .. warning:: Topology is shared among multiple threads and is protected by mutual exclusion. Using Topology from a process other than the one that initialized it will emit a warning and may result in deadlock. To prevent this from happening, MongoClient must be created after any forking. NzMongoClient opened before fork. Create MongoClient only after forking. See PyMongo's documentation for details: https://pymongo.readthedocs.io/en/stable/faq.html#is-pymongo-fork-safe) osgetpidrMwarningswarnrJrLvaluesrSrOreset_ensure_opened)rXpidserverr(r(r)rTs      "z Topology.opencCst}|dur |jjS|Sr.)r remainingr=server_selection_timeout)rXtimeoutr(r(r)get_server_selection_timeoutsz%Topology.get_server_selection_timeoutNcs`|dur }n|}j|||}fdd|DWdS1s)wYdS)aLReturn a list of Servers matching selector, or time out. :Parameters: - `selector`: function that takes a list of Servers and returns a subset of them. - `server_selection_timeout` (optional): maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. - `address`: optional server address to select. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. Ncsg|]}|jqSr()get_server_by_addressaddress.0sdrXr(r) sz+Topology.select_servers..)rirJ_select_servers_loop)rXselectorrgrkZserver_timeoutrDr(ror)select_serverss $zTopology.select_serverscCst}||}|jj|||jjd}|sO|dks||kr*td||||jf| | |j t j|jt}|jj|||jjd}|r|j|S)z7select_servers() guts. Hold the lock when calling this.)Zcustom_selectorrz*%s, Timeout: %ss, Topology Description: %r)time monotonicr?Zapply_selectorr=Zserver_selectorr_error_message descriptionrc_request_check_allrKwaitrrPZcheck_compatible)rXrrrhrknowend_timerDr(r(r)rqs.    zTopology._select_servers_loopcCsJ||||}t|dkr|dSt|d\}}|jj|jjkr#|S|S)Nr)rslenrandomsamplepoolZoperation_count)rXrrrgrkserversZserver1Zserver2r(r(r)_select_servers zTopology._select_servercCs(||||}trt|jj|S)zALike select_servers, but choose a random server if several match.)rrZ get_timeoutZset_rttrwZround_trip_time)rXrrrgrkrer(r(r) select_serverszTopology.select_servercCs|t||S)aReturn a Server for "address", reconnecting if necessary. If the server's type is not known, request an immediate check of all servers. Time out after "server_selection_timeout" if the server cannot be reached. :Parameters: - `address`: A (host, port) pair. - `server_selection_timeout` (optional): maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. )rr)rXrkrgr(r(r)select_server_by_addresssz!Topology.select_server_by_addressFcCsR|j}|j|j}t||rdSt|j|}|js"|jr0|jtj kr0|j |j}|r0|j |js6|jo9||k}|jrV|sV|jdusFJ|j|jj|||j|jff||_|||j|jr~|s~|jdusoJ|j|jj||j|jff|jr|jtjkr|jjtvr|j|r|j |j}|r|j |jdS)ziProcess a new ServerDescription on an opened topology. Hold the lock when calling this. N) r?Z_server_descriptionsrk_is_stale_server_descriptionr Z is_readableZis_server_type_known topology_typerSinglerLgetrreadyr7r8r9r<r6Z"publish_server_description_changedr4_update_servers_receive_cluster_time_no_lock cluster_timerArUr@rrSrbrK notify_all)rXserver_description reset_pooltd_oldZsd_oldZnew_tdreZsuppress_eventr(r(r)_process_change)sT             zTopology._process_changecCsl|j)|jr|j|jr$|||WddSWddSWddS1s/wYdS)z>Process a new ServerDescription after an hello call completes.N)rJrFr? has_serverrkr)rXrrr(r(r) on_changefs   "zTopology.on_changecCsd|j}|jtvr dSt|j||_||jr0|jdusJ|j|jj ||j|j ffdSdS)z_Process a new seedlist on an opened topology. Hold the lock when calling this. N) r?rrrrr8r9r<r6rAr4)rXseedlistrr(r(r)_process_srv_updateus  zTopology._process_srv_updatecCsL|j|jr||WddSWddS1swYdS)z?Process a new list of nodes obtained from scanning SRV records.N)rJrFr)rXrr(r(r) on_srv_updates  "zTopology.on_srv_updatecCs |j|S)aJGet a Server or None. Returns the current version of the server immediately, even if it's Unknown or absent from the topology. Only use this in unittests. In driver code, use select_server_by_address, since then you're assured a recent view of the server's type and wire protocol version. )rLrrXrkr(r(r)rjs zTopology.get_server_by_addresscCs ||jvSr.)rLrr(r(r)rs zTopology.has_servercCs`|j#|jj}|tjkr WddSt|djWdS1s)wYdS)z!Return primary's address or None.Nr)rJr?rrReplicaSetWithPrimaryr_new_selectionrk)rXrr(r(r) get_primarys $zTopology.get_primarycCsp|j+|jj}|tjtjfvrtWdStdd||DWdS1s1wYdS)z+Return set of replica set member addresses.NcSsg|]}|jqSr()rkrlr(r(r)rpsz5Topology._get_replica_set_members..)rJr?rrrReplicaSetNoPrimarysetr)rXrrrr(r(r)_get_replica_set_memberss$z!Topology._get_replica_set_memberscC |tS)z"Return set of secondary addresses.)rrror(r(r)get_secondaries zTopology.get_secondariescCr)z Return set of arbiter addresses.)rrror(r(r) get_arbitersrzTopology.get_arbiterscC|jS)z1Return a document, the highest seen $clusterTime.rNror(r(r)max_cluster_timezTopology.max_cluster_timecCs.|r|jr|d|jdkr||_dSdSdS)NZ clusterTimerrXrr(r(r)rs z&Topology._receive_cluster_time_no_lockcCs6|j||WddS1swYdSr.)rJrrr(r(r)receive_cluster_times "zTopology.receive_cluster_timecCs@|j||j|WddS1swYdS)z=Wake all monitors, wait for at least one to check its server.N)rJrxrKry)rXZ wait_timer(r(r)request_check_alls"zTopology.request_check_allcCs|jjtjkr |jjS|jjS)z~Return a list of all data-bearing servers. This includes any server that might be selected for an operation. )r?rrr known_serversreadable_serversror(r(r)data_bearing_serversszTopology.data_bearing_serversc Csg}|j |D]}|j|j}|||jjfq Wdn1s(wY|D])\}}z|j|Wq/t yX}zt |d|dd}| |j j|d}~wwdS)NrF) rJrrLrkappendrgenZ get_overallZremove_stale_socketsr _ErrorContext handle_errorrw)rXrrnreZ generationexcctxr(r(r) update_pools"   zTopology.update_poolcCs|j?|jD]}|q |j|_|jD]\}}||jvr,||j|_q|j r5|j d|_ d|_ Wdn1sEwY|j r`|j dusTJ|j |jj|jff|jsf|j rm|jdSdS)zClear pools and terminate monitors. Topology does not reopen on demand. Any further operations will raise :exc:`~.errors.InvalidOperation`.FTN)rJrLrarSr?rbrDitemsrwrUrFrGr8r9r<r6Zpublish_topology_closedr4r7r:)rXrerkrnr(r(r)rSs&      zTopology.closecCrr.)r?ror(r(r)rwrzTopology.descriptioncCs4|j |jWdS1swYdS)z"Pop all session ids from the pool.N)rJrOpop_allror(r(r)pop_all_sessionss$zTopology.pop_all_sessionscCs4|j |WddS1swYdSr.)rJ_check_session_supportror(r(r)_check_implicit_session_supports "z(Topology._check_implicit_session_supportcCs|jjrtdS|jj}|dur>|jjtjkr%|jjs$| t | dn |jj s2| t | d|jj}|dur>td|S)z/Internal check for session support on clusters.infNz5Sessions are not supported by this MongoDB deployment)r=rWfloatr?logical_session_timeout_minutesrrrZhas_known_serversrqrrirrrrXZsession_timeoutr(r(r)rs$  zTopology._check_session_supportcCs>|j|}|j|WdS1swYdS)z>Start or resume a server session, or raise ConfigurationError.N)rJrrOget_server_sessionrr(r(r)r3s $zTopology.get_server_sessioncCsR|r!|j|j||jjWddS1swYdS|j|dSr.)rJrOreturn_server_sessionr?rZreturn_server_session_no_lock)rXZserver_sessionlockr(r(r)r9s"zTopology.return_server_sessioncCs t|jS)zmA Selection object, initially including all known servers. Hold the lock when calling this. )rZfrom_topology_descriptionr?ror(r(r)rCs zTopology._new_selectionc Cs|jrtd|js@d|_||js|jr|j|jr*|j j t vr*|j|j j r@|t|jdtd|jdd|jD]}|qEdS)z[Start monitors, or restart after a fork. Hold the lock when calling this. z"Cannot use MongoClient after closeTrr| )okZ serviceIdZmaxWireVersionN)rGr rFrr8r7r:rTrUrwrrr=rWrrrErr4rLrarXrer(r(r)rcJs&    zTopology._ensure_openedcCsp|j|}|dur dS|j|j|jrdS|jj}|j}d}|r3t |dr3t |j t r3|j d}t ||S)NTdetailsZtopologyVersion)rLr_poolZstale_generationsock_generation service_idrwtopology_versionerrorhasattr isinstancerdict _is_stale_error_topology_version)rXrkerr_ctxreZcur_tvrerror_tvr(r(r)_is_stale_errorks    zTopology._is_stale_errorc Csj|||rdS|j|}|j}t|}|j}|jjr"|s"|js"dSt|t r,|jr,dSt|t r3dSt|t t frt |drC|j}nt|t rJdnd}|jd|}|tjvr||tjv} |jjsj|t||d| sq|jdkrv|||dS|js|jjs|t||d||dSdSt|tr|jjs|t||d|||jdSdS)Ncodei{'r)rrLrtyperr=rWcompleted_handshake issubclassr rr r rrrrrrZ_NOT_PRIMARY_CODESZ_SHUTDOWN_CODESrrmax_wire_versionrb request_checkr _monitorZ cancel_check) rXrkrrerexc_typerZerr_codedefaultZis_shutting_downr(r(r) _handle_errorsF          zTopology._handle_errorcCs8|j|||WddS1swYdS)zHandle an application error. May reset the server to Unknown, clear the pool, and request an immediate check depending on the error and the context. N)rJr)rXrkrr(r(r)rs"zTopology.handle_errorcCs|jD]}|qdS)z3Wake all monitors. Hold the lock when calling this.N)rLrarrr(r(r)rxs zTopology._request_check_allc Cs|jD]W\}}||jvrB|jj|||||jd}d}|jr)t |j }t || |||j |j|d}||j|<|q|j|jj}||j|_||jkr^|j|j|jqt|jD]\}}|j|sz||j|qfdS)zrSync our Servers from TopologyDescription.server_descriptions. Hold the lock while calling this. )rZtopologyrrYN)rrmonitorZ topology_idZ listenersevents)r?rDrrLr=Z monitor_class_create_pool_for_monitorr7rQrRr9r_create_pool_for_serverr4r6rTrw is_writablerZupdate_is_writablerCrrSpop)rXrkrnrr0reZ was_writabler(r(r)rs@       zTopology._update_serverscCs|j||jjSr.)r= pool_class pool_optionsrr(r(r)rsz Topology._create_pool_for_serverc CsD|jj}t|j|j|j|j|j|j|jd|j d }|jj ||ddS)NF) connect_timeoutsocket_timeout ssl_contexttls_allow_invalid_hostnamesZevent_listenersappnamedriverZ pause_enabled server_api) handshake) r=rrr _ssl_contextrr5rrrr)rXrkoptionsZmonitor_pool_optionsr(r(r)rs z!Topology._create_pool_for_monitorcs|jjtjtjfv}|rd}n |jjtjkrd}nd}|jjr1|tur+|r'dSd|Sd||fSt|j }t|j }|sQ|rMd||j j fSd|S|d j tfd d |d d D}|rd urod|S|r}t||js}d|StSddd |DS)zeFormat an error message if server selection fails. Hold the lock when calling this. zreplica set membersZmongosesrzNo primary available for writeszNo %s available for writeszNo %s match selector "%s"z)No %s available for replica set name "%s"zNo %s availablerc3s|]}|jkVqdSr.rrmrerr(r) :sz*Topology._error_message..r|NzNo %s found yetz\Could not reach any servers in %s. Replica set is configured with internal hostnames or IPs?,css |] }|jrt|jVqdSr.)rstrrr(r(r)rIs)r?rrrrZShardedrrrCrDrar=r>rallr intersectionrErjoin)rXrrZis_replica_setZ server_plural addressesrZsamer(rr)rvsH  zTopology._error_messagecCs"d}|jsd}d|jj||jfS)NzCLOSED z <%s %s%r>)rF __class____name__r?)rXmsgr(r(r)__repr__KszTopology.__repr__cCs"|j}tt|j|j|j|jfS)z?The properties to use for MongoClient/Topology equality checks.)r=tuplesortedrBr>rVZsrv_service_name)rXtsr(r(r)eq_propsQszTopology.eq_propscCs t||jr||kStSr.)rrrNotImplemented)rXotherr(r(r)__eq__Vs zTopology.__eq__cCs t|Sr.)hashrror(r(r)__hash__[s zTopology.__hash__)NNr.)F)r)2r __module__ __qualname____doc__r\rTrirsrqrrrrrrrrjrrrrrrrrrrrrSpropertyrwrrrrrrrcrrrrxrrrrvrrrrr(r(r(r)r+Ns`O"  !   =       !C *: r+c@seZdZdZddZdS)rz.An error with context for SDAM error handling.cCs"||_||_||_||_||_dSr.)rrrrr)rXrrrrrr(r(r)r\bs  z_ErrorContext.__init__N)rrrrr\r(r(r(r)r_s rcCs8|dus|dur dS|d|dkrdS|d|dkS)z9Return True if the error's topologyVersion is <= current.NF processIdcounterr() current_tvrr(r(r)rjs rcCsF|j|j}}|dus|durdS|d|dkrdS|d|dkS)z4Return True if the new topologyVersion is < current.NFr r )r)Z current_sdZnew_sdr Znew_tvr(r(r)rss r)8rr]r"rrHrtr_rQtypingrZpymongorrrrZpymongo.client_sessionrZpymongo.errorsrr r r r r rrrZ pymongo.hellorZpymongo.monitorrZ pymongo.poolrZpymongo.serverrZpymongo.server_descriptionrZpymongo.server_selectorsrrrrrrZpymongo.topology_descriptionrrrrr r*objectr+rrrr(r(r(r)s<  ,