lib/kafka/cluster.rb in ruby-kafka-0.7.0 vs lib/kafka/cluster.rb in ruby-kafka-0.7.1.beta1
- old
+ new
@@ -109,52 +109,36 @@
# @return [Broker] the broker that's currently leader.
def get_leader(topic, partition)
connect_to_broker(get_leader_id(topic, partition))
end
+ # Finds the broker acting as the coordinator of the given group.
+ #
+ # @param group_id: [String]
+ # @return [Broker] the broker that's currently coordinator.
def get_group_coordinator(group_id:)
@logger.debug "Getting group coordinator for `#{group_id}`"
-
refresh_metadata_if_necessary!
+ get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_GROUP, group_id)
+ end
- cluster_info.brokers.each do |broker_info|
- begin
- broker = connect_to_broker(broker_info.node_id)
- response = broker.find_coordinator(
- coordinator_type: Kafka::Protocol::COORDINATOR_TYPE_GROUP,
- coordinator_key: group_id
- )
+ # Finds the broker acting as the coordinator of the given transaction.
+ #
+ # @param transactional_id: [String]
+ # @return [Broker] the broker that's currently coordinator.
+ def get_transaction_coordinator(transactional_id:)
+ @logger.debug "Getting transaction coordinator for `#{transactional_id}`"
- Protocol.handle_error(response.error_code, response.error_message)
+ refresh_metadata_if_necessary!
- coordinator_id = response.coordinator_id
-
- @logger.debug "Coordinator for group `#{group_id}` is #{coordinator_id}. Connecting..."
-
- # It's possible that a new broker is introduced to the cluster and
- # becomes the coordinator before we have a chance to refresh_metadata.
- coordinator = begin
- connect_to_broker(coordinator_id)
- rescue Kafka::NoSuchBroker
- @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing"
- refresh_metadata!
- connect_to_broker(coordinator_id)
- end
-
- @logger.debug "Connected to coordinator: #{coordinator} for group `#{group_id}`"
-
- return coordinator
- rescue CoordinatorNotAvailable
- @logger.debug "Coordinator not available; retrying in 1s"
- sleep 1
- retry
- rescue ConnectionError => e
- @logger.error "Failed to get group coordinator info from #{broker}: #{e}"
- end
+ if transactional_id.nil?
+ # Get a random_broker
+ @logger.debug "Transaction ID is not available. Choose a random broker."
+ return random_broker
+ else
+ get_coordinator(Kafka::Protocol::COORDINATOR_TYPE_TRANSACTION, transactional_id)
end
-
- raise Kafka::Error, "Failed to find group coordinator"
end
def partitions_for(topic)
add_target_topics([topic])
refresh_metadata_if_necessary!
@@ -311,12 +295,11 @@
response = broker.list_offsets(
topics: {
topic => broker_partitions.map {|partition|
{
partition: partition,
- time: offset,
- max_offsets: 1,
+ time: offset
}
}
}
)
@@ -424,8 +407,48 @@
@broker_pool.connect(info.host, info.port, node_id: info.node_id)
end
def controller_broker
connect_to_broker(cluster_info.controller_id)
+ end
+
+ def get_coordinator(coordinator_type, coordinator_key)
+ cluster_info.brokers.each do |broker_info|
+ begin
+ broker = connect_to_broker(broker_info.node_id)
+ response = broker.find_coordinator(
+ coordinator_type: coordinator_type,
+ coordinator_key: coordinator_key
+ )
+
+ Protocol.handle_error(response.error_code, response.error_message)
+
+ coordinator_id = response.coordinator_id
+
+ @logger.debug "Coordinator for `#{coordinator_key}` is #{coordinator_id}. Connecting..."
+
+ # It's possible that a new broker is introduced to the cluster and
+ # becomes the coordinator before we have a chance to refresh_metadata.
+ coordinator = begin
+ connect_to_broker(coordinator_id)
+ rescue Kafka::NoSuchBroker
+ @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing"
+ refresh_metadata!
+ connect_to_broker(coordinator_id)
+ end
+
+ @logger.debug "Connected to coordinator: #{coordinator} for `#{coordinator_key}`"
+
+ return coordinator
+ rescue CoordinatorNotAvailable
+ @logger.debug "Coordinator not available; retrying in 1s"
+ sleep 1
+ retry
+ rescue ConnectionError => e
+ @logger.error "Failed to get coordinator info from #{broker}: #{e}"
+ end
+ end
+
+ raise Kafka::Error, "Failed to find coordinator"
end
end
end