lib/kafka/transaction_manager.rb in ruby-kafka-1.3.0 vs lib/kafka/transaction_manager.rb in ruby-kafka-1.4.0
- old
+ new
@@ -231,18 +231,27 @@
producer_epoch: @producer_epoch,
group_id: group_id
)
Protocol.handle_error(add_response.error_code)
- send_response = transaction_coordinator.txn_offset_commit(
+ send_response = group_coordinator(group_id: group_id).txn_offset_commit(
transactional_id: @transactional_id,
group_id: group_id,
producer_id: @producer_id,
producer_epoch: @producer_epoch,
offsets: offsets
)
- Protocol.handle_error(send_response.error_code)
+ send_response.errors.each do |tp|
+ tp.partitions.each do |partition|
+ Protocol.handle_error(partition.error_code)
+ end
+ end
+
+ nil
+ rescue
+ @transaction_state.transition_to!(TransactionStateMachine::ERROR)
+ raise
end
def in_transaction?
@transaction_state.in_transaction?
end
@@ -278,9 +287,15 @@
end
def transaction_coordinator
@cluster.get_transaction_coordinator(
transactional_id: @transactional_id
+ )
+ end
+
+ def group_coordinator(group_id:)
+ @cluster.get_group_coordinator(
+ group_id: group_id
)
end
def complete_transaction
@transaction_state.transition_to!(TransactionStateMachine::READY)