lib/kafka/transaction_manager.rb in ruby-kafka-1.2.0 vs lib/kafka/transaction_manager.rb in ruby-kafka-1.3.0

- old
+ new

@@ -93,11 +93,11 @@ def add_partitions_to_transaction(topic_partitions) force_transactional! if @transaction_state.uninitialized? - raise 'Transaction is uninitialized' + raise Kafka::InvalidTxnStateError, 'Transaction is uninitialized' end # Extract newly created partitions new_topic_partitions = {} topic_partitions.each do |topic, partitions| @@ -136,12 +136,12 @@ raise end def begin_transaction force_transactional! - raise 'Transaction has already started' if @transaction_state.in_transaction? - raise 'Transaction is not ready' unless @transaction_state.ready? + raise Kafka::InvalidTxnStateError, 'Transaction has already started' if @transaction_state.in_transaction? + raise Kafka::InvalidTxnStateError, 'Transaction is not ready' unless @transaction_state.ready? @transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION) @logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" nil @@ -157,11 +157,11 @@ @logger.warn("Transaction is being committed") return end unless @transaction_state.in_transaction? - raise 'Transaction is not valid to commit' + raise Kafka::InvalidTxnStateError, 'Transaction is not valid to commit' end @transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION) @logger.info "Commiting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" @@ -190,11 +190,12 @@ @logger.warn("Transaction is being aborted") return end unless @transaction_state.in_transaction? - raise 'Transaction is not valid to abort' + @logger.warn('Aborting transaction that was never opened on brokers') + return end @transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION) @logger.info "Aborting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" @@ -219,11 +220,11 @@ def send_offsets_to_txn(offsets:, group_id:) force_transactional! unless @transaction_state.in_transaction? - raise 'Transaction is not valid to send offsets' + raise Kafka::InvalidTxnStateError, 'Transaction is not valid to send offsets' end add_response = transaction_coordinator.add_offsets_to_txn( transactional_id: @transactional_id, producer_id: @producer_id, @@ -248,10 +249,14 @@ def error? @transaction_state.error? end + def ready? + @transaction_state.ready? + end + def close if in_transaction? @logger.warn("Aborting pending transaction ...") abort_transaction elsif @transaction_state.aborting_transaction? || @transaction_state.committing_transaction? @@ -262,14 +267,14 @@ private def force_transactional! unless transactional? - raise 'Please turn on transactional mode to use transaction' + raise Kafka::InvalidTxnStateError, 'Please turn on transactional mode to use transaction' end if @transactional_id.nil? || @transactional_id.empty? - raise 'Please provide a transaction_id to use transactional mode' + raise Kafka::InvalidTxnStateError, 'Please provide a transaction_id to use transactional mode' end end def transaction_coordinator @cluster.get_transaction_coordinator(