o Sb J@s>dZddlZddlmZddlmZmZddlmZddl m Z ddl m Z ddl mZmZdd lmZdd lmZdd lmZmZmZdd lmZmZmZmZdd lmZmZddl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&ddl'm(Z(ddl)m*Z*dZ+dZ,dZ-dZ.dZ/dZ0Gddde1Z2ddZ3dedefddZ4Gddde1Z5dS) zsz)_raise_bulk_write_error..)key)sortr)rEr*r*r+_raise_bulk_write_errorsrQc@seZdZdZd ddZeddZddZ   d!d d Zd"d d Z d ddZ ddZ ddZ d#ddZ ddZddZddZddZddZdS)$_Bulkz'The private guts of the bulk write API.NcCs|j|jjdtdd|_||_|jdurtd|j||_||_ g|_ d|_ ||_ d|_ d|_d|_d|_d|_d|_d|_d|_d|_dS)zInitialize a _Bulk instance.replace)Zunicode_decode_error_handlerZdocument_class) codec_optionsNletFT)Z with_optionsrT_replacedict collectionrUr r commentorderedr'executedbypass_doc_valuses_collationuses_array_filtersuses_hint_updateuses_hint_delete is_retryableretryingstarted_retryable_write current_runnext_run)r)rXrZZbypass_document_validationrYrUr*r*r+r,s,  z_Bulk.__init__cCs|jjjj}|r |js tStSN)rXdatabaseclientZ _encrypterZ_bypass_auto_encryptionrr)r)Z encrypterr*r*r+bulk_ctx_classs  z_Bulk.bulk_ctx_classcCs:td|t|tsd|vst|d<|jt|fdS)z*Add an insert document to the list of ops.documentZ_idN)r isinstancerrr'r/r)r)rjr*r*r+ add_inserts  z_Bulk.add_insertFc Cst|td|fd|fd|fd|fg}t|}|dur#d|_||d<|dur.d|_||d<|dur9d|_||d <|r>d |_|jt |fdS) z8Create an update document and add it to the list of ops.qumultiupsertNT collationZ arrayFiltershintF) rrr r]r^r_rar'r/r) r)selectorr"rorprqZ array_filtersrrcmdr*r*r+ add_updates z_Bulk.add_updatecCslt|td|fd|fdd|fg}t|}|dur!d|_||d<|dur,d|_||d<|jt|fdS) z8Create a replace document and add it to the list of ops.rmrn)roFrpNTrqrr)r rr r]r_r'r/r)r)rsrIrprqrrrtr*r*r+ add_replacesz_Bulk.add_replacecCsjtd|fd|fg}t|}|durd|_||d<|dur$d|_||d<|tkr+d|_|jt|fdS)z7Create a delete document and add it to the list of ops.rmlimitNTrqrrF) rr r]r` _DELETE_ALLrar'r/r)r)rsrwrqrrrtr*r*r+ add_deletesz_Bulk.add_deleteccs^d}t|jD]!\}\}}|durt|}n |j|kr#|Vt|}|||q|VdS)ziGenerate batches of operations, batched by type of operation, in the order **provided**. N) enumerater'r$r%r1)r)rDr-r%r0r*r*r+ gen_ordereds   z_Bulk.gen_orderedccsZttttttg}t|jD]\}\}}||||q|D]}|jr*|Vq"dS)zbGenerate batches of operations, batched by type of operation, in arbitrary order. N)r$rrrrzr'r1)r) operationsr-r%r0rDr*r*r+ gen_unorderedsz_Bulk.gen_unorderedc  Csv|jjj} |jjj} | j} |jst||_d|_|j} || |d} | r9|j s6t|d|_|jdur6d} t | j }| | |||| || j |jj }| jt| jkr$| rct| j| jdkrc|pb|}t||jjfd|jfg}|jrx|j|d<t|||jrd|d<|jdur| j ttfvr|j|d<|r|r|js|d|_|||tj||||| | ||!| |t"| j| jd}|j#r |$||| \}}|%di}|%d d t&vrt'(|}t)| || j|t*|t)| || j|d|_ d|_|jr d |vr nn|+||| }| jt|7_| jt| jksS|jr/|d r/dS|j|_} | s&dSdS) NFTrrZrYZbypassDocumentValidationrUZwriteConcernErrorcoderr=),rXrgnamerh_event_listenersrdnextreZvalidate_sessionrb _COMMANDSr%rirTr(rAr'rrZrYrZapply_write_concernr\rUrrrcZ_start_retryable_writeZ _apply_torZPRIMARYZsend_cluster_timeadd_server_apiZ apply_timeoutr acknowledgedexecuter@rrCdeepcopyrKrQ execute_unack)r) generator write_concernsession sock_infoop_id retryablerEZfinal_write_concerndb_namerh listenersrDZlast_runcmd_namebwcrtr'rGto_sendrJfullr*r*r+_execute_command s               7  z_Bulk._execute_commandc sggdddddgdtfdd}jjj}||}|j||Wdn1s6wYdsCdrGtS)zExecute using write commands.rr=r?r7r:r;r<r8r9c s|||dSrf)r)rrrrErrr)rr*r+retryable_bulksz-_Bulk.execute_command..retryable_bulkNr=r?)rrXrgrhZ _tmp_sessionZ_retry_with_sessionrarQ)r)rrrrrhsr*rr+execute_commandqs$   z_Bulk.execute_commandc Cs|jjj}|jjj}|j}t}|jst||_|j}|ryt|j }| |||||d|j |jj } |j t |jkrmt||jjfddddifg} || t|j|j d} | | | |} |j t | 7_ |j t |jks9t|d|_}|sdSdS)zCExecute write commands with OP_MSG and w=0 writeConcern, unordered.N)rZFZ writeConcernwr)rXrgrrhrrrdrrr%rirTr(rAr'rrrr) r)rrrrhrrrDrrrtr'rr*r*r+execute_op_msg_no_resultssB         z_Bulk.execute_op_msg_no_resultsc CsVggdddddgd}t}t}z|||d||d||WdSty*YdSw)zAExecute write commands with OP_MSG and w=0 WriteConcern, ordered.rrNF)rrrr)r)rrrrEZinitial_write_concernrr*r*r+execute_command_no_resultss2  z _Bulk.execute_command_no_resultscCs|jrtd|jrtd|o|j }|r"|jr"|jdkr"td|r0|jr0|jdkr0td|jr7td|j rA| |||S| ||S)z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes. zPMust be connected to MongoDB 4.4+ to use hint on unacknowledged delete commands.rzPMust be connected to MongoDB 4.2+ to use hint on unacknowledged update commands.zGCannot set bypass_document_validation with unacknowledged write concern) r]rr^rr`Zmax_wire_versionr_r\rrZrr)r)rrrZunackr*r*r+execute_no_resultss(  z_Bulk.execute_no_resultscCs|jstd|jrtdd|_|p|jj}t||}|jr$|}n|}|jj j }|j sO| |}| |||WddS1sHwYdS||||S)zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TN)r'rr[rXrr rZr{r}rgrhrZ_socket_for_writesrr)r)rrrrhrr*r*r+rs      "z _Bulk.execute)NN)FFNNN)FNNrf)r2r3r4r5r,propertyrirlrurvryr{r}rrrrrrr*r*r*r+rRs.      g' rR)6r5rC itertoolsrtypingrrZ bson.objectidrZ bson.raw_bsonrZbson.sonrZpymongorr Zpymongo.client_sessionr Zpymongo.collationr Zpymongo.commonr r rZpymongo.errorsrrrrZpymongo.helpersrrZpymongo.messagerrrrrrZpymongo.read_preferencesrZpymongo.write_concernrrxZ _DELETE_ONEZ _BAD_VALUEZ_UNKNOWN_ERRORZ_WRITE_CONCERN_ERRORrobjectr$rKrQrRr*r*r*r+s4         '