Sha256: 931488be982aac4aa88d895e9b0b8ab3e12faa6b9a5ac57d6c989b1e549d3cbc
Contents?: true
Size: 697 Bytes
Versions: 30
Compression:
Stored size: 697 Bytes
Contents
require "kafka/broker" module Kafka class BrokerPool def initialize(connection_builder:, logger:) @logger = logger @connection_builder = connection_builder @brokers = {} end def connect(host, port, node_id: nil) return @brokers.fetch(node_id) if @brokers.key?(node_id) broker = Broker.new( connection: @connection_builder.build_connection(host, port), node_id: node_id, logger: @logger, ) @brokers[node_id] = broker unless node_id.nil? broker end def close @brokers.each do |id, broker| @logger.info "Disconnecting broker #{id}" broker.disconnect end end end end
Version data entries
30 entries across 30 versions & 1 rubygems