Sha256: 2efd0f1ca8e5fe0769d2754e16e37c382c00c851d40144f72113d2317af4752d
Contents?: true
Size: 1.3 KB
Versions: 1
Compression:
Stored size: 1.3 KB
Contents
require "kafka/broker_pool" require "kafka/producer" module Kafka class Client DEFAULT_CLIENT_ID = "ruby-kafka" # Initializes a new Kafka client. # # @param seed_brokers [Array<String>] the list of brokers used to initialize # the client. # # @param client_id [String] the identifier for this application. # # @param logger [Logger] # # @param socket_timeout [Integer, nil] the timeout setting for socket # connections. See {BrokerPool#initialize}. # # @return [Client] def initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger:, socket_timeout: nil) @logger = logger @broker_pool = BrokerPool.new( seed_brokers: seed_brokers, client_id: client_id, logger: logger, socket_timeout: socket_timeout, ) end # Builds a new producer. # # `options` are passed to {Producer#initialize}. # # @see Producer#initialize # @return [Producer] the Kafka producer. def get_producer(**options) Producer.new(broker_pool: @broker_pool, logger: @logger, **options) end # Lists all topics in the cluster. # # @return [Array<String>] the list of topic names. def topics @broker_pool.topics end def close @broker_pool.shutdown end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
ruby-kafka-0.1.1 | lib/kafka/client.rb |