lib/kafka/cluster.rb in ruby-kafka-0.5.0 vs lib/kafka/cluster.rb in ruby-kafka-0.5.1.beta1
- old
+ new
@@ -1,7 +1,7 @@
-require "set"
require "kafka/broker_pool"
+require "set"
module Kafka
# A cluster represents the state of a Kafka cluster. It needs to be initialized
# with a non-empty list of seed brokers. The first seed broker that the cluster can connect
@@ -47,10 +47,25 @@
refresh_metadata!
end
end
+ def api_info(api_key)
+ apis.find {|api| api.api_key == api_key }
+ end
+
+ def apis
+ @apis ||=
+ begin
+ response = random_broker.api_versions
+
+ Protocol.handle_error(response.error_code)
+
+ response.apis
+ end
+ end
+
# Clears the list of target topics.
#
# @see #add_target_topics
# @return [nil]
def clear_target_topics
@@ -128,10 +143,45 @@
rescue Kafka::ProtocolError
mark_as_stale!
raise
end
+ def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30)
+ options = {
+ topics: {
+ name => {
+ num_partitions: num_partitions,
+ replication_factor: replication_factor,
+ }
+ },
+ timeout: timeout,
+ }
+
+ broker = controller_broker
+
+ @logger.info "Creating topic `#{name}` using controller broker #{broker}"
+
+ response = broker.create_topics(**options)
+
+ response.errors.each do |topic, error_code|
+ Protocol.handle_error(error_code)
+ end
+
+ begin
+ partitions_for(name).each do |info|
+ Protocol.handle_error(info.partition_error_code)
+ end
+ rescue Kafka::LeaderNotAvailable
+ @logger.warn "Leader not yet available for `#{name}`, waiting 1s..."
+ sleep 1
+
+ retry
+ end
+
+ @logger.info "Topic `#{name}` was created"
+ end
+
def resolve_offsets(topic, partitions, offset)
add_target_topics([topic])
refresh_metadata_if_necessary!
partitions_by_broker = partitions.each_with_object({}) {|partition, hsh|
@@ -176,10 +226,11 @@
def resolve_offset(topic, partition, offset)
resolve_offsets(topic, [partition], offset).fetch(partition)
end
def topics
+ refresh_metadata_if_necessary!
cluster_info.topics.map(&:topic_name)
end
def disconnect
@broker_pool.close
@@ -211,15 +262,19 @@
begin
broker = @broker_pool.connect(node.hostname, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)
- @stale = false
+ if cluster_info.brokers.empty?
+ @logger.error "No brokers in cluster"
+ else
+ @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"
- @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"
+ @stale = false
- return cluster_info
+ return cluster_info
+ end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node}: #{e}"
errors << [node, e]
ensure
broker.disconnect unless broker.nil?
@@ -229,12 +284,21 @@
error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")
raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
end
+ def random_broker
+ node_id = cluster_info.brokers.sample.node_id
+ connect_to_broker(node_id)
+ end
+
def connect_to_broker(broker_id)
info = cluster_info.find_broker(broker_id)
@broker_pool.connect(info.host, info.port, node_id: info.node_id)
+ end
+
+ def controller_broker
+ connect_to_broker(cluster_info.controller_id)
end
end
end