Sha256: 805895a56308fbcb02519e664eb399aa1bc08d28935c9d631c772865c7042d91
Contents?: true
Size: 853 Bytes
Versions: 15
Compression:
Stored size: 853 Bytes
Contents
require "kafka/broker" module Kafka class BrokerPool def initialize(connection_builder:, logger:) @logger = logger @connection_builder = connection_builder @brokers = {} end def connect(host, port, node_id: nil) if @brokers.key?(node_id) broker = @brokers.fetch(node_id) return broker if broker.address_match?(host, port) broker.disconnect @brokers[node_id] = nil end broker = Broker.new( connection_builder: @connection_builder, host: host, port: port, node_id: node_id, logger: @logger, ) @brokers[node_id] = broker unless node_id.nil? broker end def close @brokers.each do |id, broker| @logger.info "Disconnecting broker #{id}" broker.disconnect end end end end
Version data entries
15 entries across 15 versions & 1 rubygems