/* * Copyright 2021-Present Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include "attempt_context_testing_hooks.hxx" #include "couchbase/codec/encoded_value.hxx" #include "error_list.hxx" #include "waitable_op_list.hxx" #include "async_attempt_context.hxx" #include "attempt_context.hxx" #include "attempt_state.hxx" #include "internal/atr_cleanup_entry.hxx" #include "internal/exceptions_internal.hxx" #include "internal/transaction_context.hxx" #include "transaction_get_result.hxx" #include #include #include #include #include // implemented in core::impl::query, to take advantage of the statics over there namespace couchbase::core { class cluster; namespace impl { auto build_transaction_query_result(operations::query_response resp, std::error_code ec = {}) -> std::pair; auto build_transaction_query_request(couchbase::query_options::built opts) -> core::operations::query_request; } // namespace impl namespace transactions { /** * Provides methods to allow an application's transaction logic to read, mutate, * insert and delete documents, as well as commit or rollback the transaction. */ class transactions; enum class forward_compat_stage; class staged_mutation_queue; class staged_mutation; class attempt_context_impl : public attempt_context , public couchbase::transactions::attempt_context , public async_attempt_context , public couchbase::transactions::async_attempt_context , public std::enable_shared_from_this { private: std::shared_ptr overall_; std::optional atr_id_; bool is_done_{ false }; std::unique_ptr staged_mutations_; attempt_context_testing_hooks& hooks_; error_list errors_; std::mutex mutex_; waitable_op_list op_list_; std::string query_context_; // commit needs to access the hooks friend class staged_mutation_queue; // entry needs access to private members friend class atr_cleanup_entry; // transaction_context needs access to the two functions below friend class transaction_context; auto insert_raw(const collection& coll, const std::string& id, codec::encoded_value content) -> std::pair override; auto insert_raw(const core::document_id& id, codec::encoded_value content) -> core::transactions::transaction_get_result override; void insert_raw(const collection& coll, std::string id, codec::encoded_value content, couchbase::transactions::async_result_handler&& handler) override; void insert_raw( const core::document_id& id, codec::encoded_value content, std::function)>&& cb) override; auto replace_raw(const couchbase::transactions::transaction_get_result& doc, codec::encoded_value content) -> std::pair override; auto replace_raw(const transaction_get_result& document, codec::encoded_value content) -> transaction_get_result override; void replace_raw(couchbase::transactions::transaction_get_result doc, codec::encoded_value content, couchbase::transactions::async_result_handler&& handler) override; void replace_raw( const transaction_get_result& document, codec::encoded_value content, std::function)>&& cb) override; void remove_staged_insert(const core::document_id& id, VoidCallback&& cb); void get_with_query( const core::document_id& id, bool optional, std::function)>&& cb); void insert_raw_with_query( const core::document_id& id, codec::encoded_value content, std::function)>&& cb); void replace_raw_with_query( const transaction_get_result& document, codec::encoded_value content, std::function)>&& cb); void remove_with_query(const transaction_get_result& document, VoidCallback&& cb); void commit_with_query(VoidCallback&& cb); void rollback_with_query(VoidCallback&& cb); void query_begin_work(std::optional query_context, VoidCallback&& cb); void do_query(const std::string& statement, const couchbase::transactions::transaction_query_options& opts, std::optional query_context, QueryCallback&& cb); auto handle_query_error(const core::operations::query_response& resp) -> std::exception_ptr; void wrap_query(const std::string& statement, const couchbase::transactions::transaction_query_options& opts, std::vector params, const tao::json::value& txdata, const std::string& hook_point, bool check_expiry, std::optional query_context, std::function&& cb); void handle_err_from_callback(std::exception_ptr e); template void op_completed_with_callback(Cb&& cb, std::optional t) { try { op_list_.decrement_in_flight(); cb({}, t); op_list_.decrement_ops(); } catch (...) { handle_err_from_callback(std::current_exception()); } } template void op_completed_with_callback(Cb&& cb) { try { op_list_.decrement_in_flight(); cb({}); op_list_.decrement_ops(); } catch (...) { handle_err_from_callback(std::current_exception()); } } template void op_completed_with_error(ErrorHandler&& cb, ExceptionType&& err) { return op_completed_with_error(std::forward(cb), std::make_exception_ptr(std::forward(err))); } void op_completed_with_error(VoidCallback cb, std::exception_ptr err); template void op_completed_with_error(std::function)> cb, std::exception_ptr&& err) { try { std::rethrow_exception(std::move(err)); } catch (const transaction_operation_failed& e) { // if this is a transaction_operation_failed, we need to cache it before // moving on... errors_.push_back(e); try { op_list_.decrement_in_flight(); cb(std::current_exception(), std::optional()); op_list_.decrement_ops(); } catch (...) { handle_err_from_callback(std::current_exception()); } } catch (...) { try { op_list_.decrement_in_flight(); cb(std::current_exception(), std::optional()); op_list_.decrement_ops(); } catch (...) { handle_err_from_callback(std::current_exception()); } } } template void op_completed_with_error_no_cache( std::function)>&& cb, std::exception_ptr err) { try { cb(err, std::optional()); } catch (...) { // eat it. } } void op_completed_with_error_no_cache(VoidCallback&& cb, std::exception_ptr err) { try { cb(std::move(err)); } catch (...) { // just eat it. } } template void cache_error_async(Handler cb, std::function func) { try { op_list_.increment_ops(); existing_error(); return func(); } catch (const async_operation_conflict& e) { CB_ATTEMPT_CTX_LOG_ERROR(this, "Attempted to perform txn operation after " "commit/rollback started: {}", e.what()); // you cannot call op_completed_with_error, as it tries to decrement // the op count, however it didn't successfully increment it, so... auto err = transaction_operation_failed(FAIL_OTHER, "async operation conflict"); switch (state()) { case attempt_state::ABORTED: case attempt_state::ROLLED_BACK: err.cause(TRANSACTION_ALREADY_ABORTED); break; case attempt_state::COMMITTED: case attempt_state::COMPLETED: err.cause(TRANSACTION_ALREADY_COMMITTED); break; default: err.cause(UNKNOWN); } op_completed_with_error_no_cache(std::move(cb), std::make_exception_ptr(err)); } catch (const transaction_operation_failed& e) { // thrown only from call_func when previous error exists, so eat it, // unless it has PREVIOUS_OP_FAILED or FEATURE_NOT_AVAILABLE_EXCEPTION // cause if (e.cause() == PREVIOUS_OPERATION_FAILED || e.cause() == FEATURE_NOT_AVAILABLE_EXCEPTION) { op_completed_with_error(std::move(cb), e); } } catch (const op_exception& e) { op_completed_with_error(std::move(cb), e); } catch (const std::exception& e) { op_completed_with_error(std::move(cb), transaction_operation_failed(FAIL_OTHER, e.what())); } } [[nodiscard]] auto cluster_ref() const -> const core::cluster&; explicit attempt_context_impl(std::shared_ptr transaction_ctx); public: static auto create(std::shared_ptr transaction_ctx) -> std::shared_ptr; ~attempt_context_impl() override; attempt_context_impl(attempt_context_impl&) = delete; attempt_context_impl(attempt_context_impl&&) = delete; auto operator=(attempt_context_impl&) -> attempt_context_impl& = delete; auto operator=(attempt_context_impl&&) -> attempt_context_impl& = delete; auto get(const core::document_id& id) -> transaction_get_result override; auto get(const couchbase::collection& coll, const std::string& id) -> std::pair override; void get(const couchbase::collection& coll, std::string id, couchbase::transactions::async_result_handler&& handler) override; void get( const core::document_id& id, std::function)>&& cb) override; auto get_optional(const core::document_id& id) -> std::optional override; void get_optional( const core::document_id& id, std::function)>&& cb) override; auto get_replica_from_preferred_server_group(const core::document_id& id) -> std::optional override; void get_replica_from_preferred_server_group( const core::document_id& id, std::function)>&& cb) override; auto get_replica_from_preferred_server_group(const couchbase::collection& coll, const std::string& id) -> std::pair override; void get_replica_from_preferred_server_group( const couchbase::collection& coll, const std::string& id, couchbase::transactions::async_result_handler&& handler) override; void remove(const transaction_get_result& document) override; auto remove(const couchbase::transactions::transaction_get_result& doc) -> couchbase::error override; void remove(const transaction_get_result& document, VoidCallback&& cb) override; void remove(couchbase::transactions::transaction_get_result doc, couchbase::transactions::async_err_handler&& handler) override; auto do_core_query(const std::string& statement, const couchbase::transactions::transaction_query_options& options, std::optional query_context) -> core::operations::query_response override; auto do_public_query(const std::string& statement, const couchbase::transactions::transaction_query_options& opts, std::optional query_context) -> std::pair override; void query(const std::string& statement, const couchbase::transactions::transaction_query_options& options, std::optional query_context, QueryCallback&& cb) override; void query(std::string statement, couchbase::transactions::transaction_query_options opts, std::optional query_context, couchbase::transactions::async_query_handler&& handler) override; void commit() override; void commit(VoidCallback&& cb) override; void rollback() override; void rollback(VoidCallback&& cb) override; void existing_error(bool prev_op_failed = true); [[nodiscard]] auto is_done() const -> bool { return is_done_; } [[nodiscard]] auto overall() -> std::shared_ptr { return overall_; } [[nodiscard]] auto transaction_id() const -> const std::string& { return overall_->transaction_id(); } [[nodiscard]] auto id() const -> const std::string& { return overall_->current_attempt().id; } [[nodiscard]] auto state() -> attempt_state { return overall_->current_attempt().state; } void state(attempt_state s) { overall_->current_attempt_state(s); } [[nodiscard]] auto atr_id() const -> const std::string& { return overall_->atr_id(); } void atr_id(const std::string& atr_id) { overall_->atr_id(atr_id); } [[nodiscard]] auto atr_collection() const -> const std::string& { return overall_->atr_collection(); } void atr_collection_name(const std::string& coll) { overall_->atr_collection(coll); } auto has_expired_client_side(std::string place, std::optional doc_id) -> bool; private: std::atomic expiry_overtime_mode_{ false }; auto check_expiry_pre_commit(std::string stage, std::optional doc_id) -> bool; void check_expiry_during_commit_or_rollback(const std::string& stage, std::optional doc_id); void set_atr_pending_locked( const core::document_id& id, std::unique_lock&& lock, std::function)>&& fn); auto error_if_expired_and_not_in_overtime(const std::string& stage, std::optional doc_id) -> std::optional; auto check_for_own_write(const core::document_id& id) -> staged_mutation*; void check_and_handle_blocking_transactions( const transaction_get_result& doc, forward_compat_stage stage, std::function)>&& cb); template void check_atr_entry_for_blocking_document(const transaction_get_result& doc, Delay delay, Handler&& cb); template void check_if_done(Handler& cb); void atr_commit(bool ambiguity_resolution_mode); void atr_commit_ambiguity_resolution(); void atr_complete(); void atr_abort(); void atr_rollback_complete(); void select_atr_if_needed_unlocked( const core::document_id& id, std::function)>&& cb); template void do_get(const core::document_id& id, bool allow_replica, std::optional resolving_missing_atr_entry, Handler&& cb); void get_doc(const core::document_id& id, bool allow_replica, std::function, std::optional, std::optional)>&& cb); auto create_document_metadata(const std::string& operation_type, const std::string& operation_id, const std::optional& document_metadata, std::uint32_t user_flags_to_stage) -> tao::json::value; template void create_staged_insert(const core::document_id& id, codec::encoded_value content, uint64_t cas, Delay&& delay, const std::string& op_id, Handler&& cb); template void create_staged_replace(const transaction_get_result& document, codec::encoded_value content, const std::string& op_id, Handler&& cb); template void create_staged_insert_error_handler(const core::document_id& id, codec::encoded_value content, uint64_t cas, Delay&& delay, const std::string& op_id, Handler&& cb, error_class ec, external_exception cause, const std::string& message); void ensure_open_bucket(const std::string& bucket_name, std::function&& handler); }; } // namespace transactions } // namespace couchbase::core