Sha256: cea423cc981654e9b32a32dced5c2a3ad37aa0d0490448b17fc6c1e148be3f1e

Contents?: true

Size: 1.18 KB

Versions: 1

Compression:

Stored size: 1.18 KB

Contents

require "logger"
require "kafka/connection"
require "kafka/protocol"

module Kafka
  class Cluster
    def self.connect(brokers:, client_id:, logger:)
      host, port = brokers.first.split(":", 2)

      connection = Connection.new(
        host: host,
        port: port.to_i,
        client_id: client_id,
        logger: logger
      )

      connection.open

      new(connection: connection, logger: logger)
    end

    def initialize(connection:, logger: nil)
      @connection = connection
      @logger = logger
    end

    def fetch_metadata(**options)
      api_key = Protocol::TOPIC_METADATA_API_KEY
      request = Protocol::TopicMetadataRequest.new(**options)
      response = Protocol::MetadataResponse.new

      @connection.write_request(api_key, request)
      @connection.read_response(response)

      response
    end

    def produce(**options)
      api_key = Protocol::PRODUCE_API_KEY
      request = Protocol::ProduceRequest.new(**options)

      @connection.write_request(api_key, request)

      if request.requires_acks?
        response = Protocol::ProduceResponse.new
        @connection.read_response(response)
        response
      else
        nil
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ruby-kafka-0.1.0.pre.alpha lib/kafka/cluster.rb