lib/kafka/transaction_manager.rb in ruby-kafka-1.2.0 vs lib/kafka/transaction_manager.rb in ruby-kafka-1.3.0
- old
+ new
@@ -93,11 +93,11 @@
def add_partitions_to_transaction(topic_partitions)
force_transactional!
if @transaction_state.uninitialized?
- raise 'Transaction is uninitialized'
+ raise Kafka::InvalidTxnStateError, 'Transaction is uninitialized'
end
# Extract newly created partitions
new_topic_partitions = {}
topic_partitions.each do |topic, partitions|
@@ -136,12 +136,12 @@
raise
end
def begin_transaction
force_transactional!
- raise 'Transaction has already started' if @transaction_state.in_transaction?
- raise 'Transaction is not ready' unless @transaction_state.ready?
+ raise Kafka::InvalidTxnStateError, 'Transaction has already started' if @transaction_state.in_transaction?
+ raise Kafka::InvalidTxnStateError, 'Transaction is not ready' unless @transaction_state.ready?
@transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION)
@logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"
nil
@@ -157,11 +157,11 @@
@logger.warn("Transaction is being committed")
return
end
unless @transaction_state.in_transaction?
- raise 'Transaction is not valid to commit'
+ raise Kafka::InvalidTxnStateError, 'Transaction is not valid to commit'
end
@transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION)
@logger.info "Commiting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"
@@ -190,11 +190,12 @@
@logger.warn("Transaction is being aborted")
return
end
unless @transaction_state.in_transaction?
- raise 'Transaction is not valid to abort'
+ @logger.warn('Aborting transaction that was never opened on brokers')
+ return
end
@transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION)
@logger.info "Aborting transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"
@@ -219,11 +220,11 @@
def send_offsets_to_txn(offsets:, group_id:)
force_transactional!
unless @transaction_state.in_transaction?
- raise 'Transaction is not valid to send offsets'
+ raise Kafka::InvalidTxnStateError, 'Transaction is not valid to send offsets'
end
add_response = transaction_coordinator.add_offsets_to_txn(
transactional_id: @transactional_id,
producer_id: @producer_id,
@@ -248,10 +249,14 @@
def error?
@transaction_state.error?
end
+ def ready?
+ @transaction_state.ready?
+ end
+
def close
if in_transaction?
@logger.warn("Aborting pending transaction ...")
abort_transaction
elsif @transaction_state.aborting_transaction? || @transaction_state.committing_transaction?
@@ -262,14 +267,14 @@
private
def force_transactional!
unless transactional?
- raise 'Please turn on transactional mode to use transaction'
+ raise Kafka::InvalidTxnStateError, 'Please turn on transactional mode to use transaction'
end
if @transactional_id.nil? || @transactional_id.empty?
- raise 'Please provide a transaction_id to use transactional mode'
+ raise Kafka::InvalidTxnStateError, 'Please provide a transaction_id to use transactional mode'
end
end
def transaction_coordinator
@cluster.get_transaction_coordinator(