lib/kafka/transaction_manager.rb in ruby-kafka-1.3.0 vs lib/kafka/transaction_manager.rb in ruby-kafka-1.4.0

- old
+ new

@@ -231,18 +231,27 @@ producer_epoch: @producer_epoch, group_id: group_id ) Protocol.handle_error(add_response.error_code) - send_response = transaction_coordinator.txn_offset_commit( + send_response = group_coordinator(group_id: group_id).txn_offset_commit( transactional_id: @transactional_id, group_id: group_id, producer_id: @producer_id, producer_epoch: @producer_epoch, offsets: offsets ) - Protocol.handle_error(send_response.error_code) + send_response.errors.each do |tp| + tp.partitions.each do |partition| + Protocol.handle_error(partition.error_code) + end + end + + nil + rescue + @transaction_state.transition_to!(TransactionStateMachine::ERROR) + raise end def in_transaction? @transaction_state.in_transaction? end @@ -278,9 +287,15 @@ end def transaction_coordinator @cluster.get_transaction_coordinator( transactional_id: @transactional_id + ) + end + + def group_coordinator(group_id:) + @cluster.get_group_coordinator( + group_id: group_id ) end def complete_transaction @transaction_state.transition_to!(TransactionStateMachine::READY)