lib/kafka/cluster.rb in ruby-kafka-0.7.0 vs lib/kafka/cluster.rb in ruby-kafka-0.7.1.beta1

- old
+ new

@@ -109,52 +109,36 @@ # @return [Broker] the broker that's currently leader. def get_leader(topic, partition) connect_to_broker(get_leader_id(topic, partition)) end + # Finds the broker acting as the coordinator of the given group. + # + # @param group_id: [String] + # @return [Broker] the broker that's currently coordinator. def get_group_coordinator(group_id:) @logger.debug "Getting group coordinator for `#{group_id}`" - refresh_metadata_if_necessary! + get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_GROUP, group_id) + end - cluster_info.brokers.each do |broker_info| - begin - broker = connect_to_broker(broker_info.node_id) - response = broker.find_coordinator( - coordinator_type: Kafka::Protocol::COORDINATOR_TYPE_GROUP, - coordinator_key: group_id - ) + # Finds the broker acting as the coordinator of the given transaction. + # + # @param transactional_id: [String] + # @return [Broker] the broker that's currently coordinator. + def get_transaction_coordinator(transactional_id:) + @logger.debug "Getting transaction coordinator for `#{transactional_id}`" - Protocol.handle_error(response.error_code, response.error_message) + refresh_metadata_if_necessary! - coordinator_id = response.coordinator_id - - @logger.debug "Coordinator for group `#{group_id}` is #{coordinator_id}. Connecting..." - - # It's possible that a new broker is introduced to the cluster and - # becomes the coordinator before we have a chance to refresh_metadata. - coordinator = begin - connect_to_broker(coordinator_id) - rescue Kafka::NoSuchBroker - @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing" - refresh_metadata! - connect_to_broker(coordinator_id) - end - - @logger.debug "Connected to coordinator: #{coordinator} for group `#{group_id}`" - - return coordinator - rescue CoordinatorNotAvailable - @logger.debug "Coordinator not available; retrying in 1s" - sleep 1 - retry - rescue ConnectionError => e - @logger.error "Failed to get group coordinator info from #{broker}: #{e}" - end + if transactional_id.nil? + # Get a random_broker + @logger.debug "Transaction ID is not available. Choose a random broker." + return random_broker + else + get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_TRANSACTION, transactional_id) end - - raise Kafka::Error, "Failed to find group coordinator" end def partitions_for(topic) add_target_topics([topic]) refresh_metadata_if_necessary! @@ -311,12 +295,11 @@ response = broker.list_offsets( topics: { topic => broker_partitions.map {|partition| { partition: partition, - time: offset, - max_offsets: 1, + time: offset } } } ) @@ -424,8 +407,48 @@ @broker_pool.connect(info.host, info.port, node_id: info.node_id) end def controller_broker connect_to_broker(cluster_info.controller_id) + end + + def get_coordinator(coordinator_type, coordinator_key) + cluster_info.brokers.each do |broker_info| + begin + broker = connect_to_broker(broker_info.node_id) + response = broker.find_coordinator( + coordinator_type: coordinator_type, + coordinator_key: coordinator_key + ) + + Protocol.handle_error(response.error_code, response.error_message) + + coordinator_id = response.coordinator_id + + @logger.debug "Coordinator for `#{coordinator_key}` is #{coordinator_id}. Connecting..." + + # It's possible that a new broker is introduced to the cluster and + # becomes the coordinator before we have a chance to refresh_metadata. + coordinator = begin + connect_to_broker(coordinator_id) + rescue Kafka::NoSuchBroker + @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing" + refresh_metadata! + connect_to_broker(coordinator_id) + end + + @logger.debug "Connected to coordinator: #{coordinator} for `#{coordinator_key}`" + + return coordinator + rescue CoordinatorNotAvailable + @logger.debug "Coordinator not available; retrying in 1s" + sleep 1 + retry + rescue ConnectionError => e + @logger.error "Failed to get coordinator info from #{broker}: #{e}" + end + end + + raise Kafka::Error, "Failed to find coordinator" end end end