lib/kafka/cluster.rb in ruby-kafka-0.5.0 vs lib/kafka/cluster.rb in ruby-kafka-0.5.1.beta1

- old
+ new

@@ -1,7 +1,7 @@ -require "set" require "kafka/broker_pool" +require "set" module Kafka # A cluster represents the state of a Kafka cluster. It needs to be initialized # with a non-empty list of seed brokers. The first seed broker that the cluster can connect @@ -47,10 +47,25 @@ refresh_metadata! end end + def api_info(api_key) + apis.find {|api| api.api_key == api_key } + end + + def apis + @apis ||= + begin + response = random_broker.api_versions + + Protocol.handle_error(response.error_code) + + response.apis + end + end + # Clears the list of target topics. # # @see #add_target_topics # @return [nil] def clear_target_topics @@ -128,10 +143,45 @@ rescue Kafka::ProtocolError mark_as_stale! raise end + def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30) + options = { + topics: { + name => { + num_partitions: num_partitions, + replication_factor: replication_factor, + } + }, + timeout: timeout, + } + + broker = controller_broker + + @logger.info "Creating topic `#{name}` using controller broker #{broker}" + + response = broker.create_topics(**options) + + response.errors.each do |topic, error_code| + Protocol.handle_error(error_code) + end + + begin + partitions_for(name).each do |info| + Protocol.handle_error(info.partition_error_code) + end + rescue Kafka::LeaderNotAvailable + @logger.warn "Leader not yet available for `#{name}`, waiting 1s..." + sleep 1 + + retry + end + + @logger.info "Topic `#{name}` was created" + end + def resolve_offsets(topic, partitions, offset) add_target_topics([topic]) refresh_metadata_if_necessary! partitions_by_broker = partitions.each_with_object({}) {|partition, hsh| @@ -176,10 +226,11 @@ def resolve_offset(topic, partition, offset) resolve_offsets(topic, [partition], offset).fetch(partition) end def topics + refresh_metadata_if_necessary! cluster_info.topics.map(&:topic_name) end def disconnect @broker_pool.close @@ -211,15 +262,19 @@ begin broker = @broker_pool.connect(node.hostname, node.port) cluster_info = broker.fetch_metadata(topics: @target_topics) - @stale = false + if cluster_info.brokers.empty? + @logger.error "No brokers in cluster" + else + @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" - @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" + @stale = false - return cluster_info + return cluster_info + end rescue Error => e @logger.error "Failed to fetch metadata from #{node}: #{e}" errors << [node, e] ensure broker.disconnect unless broker.nil? @@ -229,12 +284,21 @@ error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n") raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}" end + def random_broker + node_id = cluster_info.brokers.sample.node_id + connect_to_broker(node_id) + end + def connect_to_broker(broker_id) info = cluster_info.find_broker(broker_id) @broker_pool.connect(info.host, info.port, node_id: info.node_id) + end + + def controller_broker + connect_to_broker(cluster_info.controller_id) end end end