Sha256: fdc351d8b39427b0803b3cf40eba86f273b7e7ad1f12c3be38a951837b9bb172
Contents?: true
Size: 834 Bytes
Versions: 7
Compression:
Stored size: 834 Bytes
Contents
require "kafka/broker" module Kafka class BrokerPool def initialize(connection_builder:, logger:) @logger = logger @connection_builder = connection_builder @brokers = {} end def connect(host, port, node_id: nil) if @brokers.key?(node_id) broker = @brokers.fetch(node_id) return broker if broker.address_match?(host, port) broker.disconnect @brokers[node_id] = nil end broker = Broker.new( connection: @connection_builder.build_connection(host, port), node_id: node_id, logger: @logger, ) @brokers[node_id] = broker unless node_id.nil? broker end def close @brokers.each do |id, broker| @logger.info "Disconnecting broker #{id}" broker.disconnect end end end end
Version data entries
7 entries across 7 versions & 1 rubygems