Sha256: cb211fccd78feb164e25ee0e091ec607846dd421f062ba768044e86295785605

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

require "logger"
require "kafka/connection"
require "kafka/protocol"

module Kafka
  class Broker
    def self.connect(node_id: nil, logger:, **options)
      connection = Connection.new(logger: logger, **options)
      new(connection: connection, node_id: node_id, logger: logger)
    end

    def initialize(connection:, node_id: nil, logger:)
      @connection = connection
      @node_id = node_id
      @logger = logger
    end

    def to_s
      "#{@connection} (node_id=#{@node_id.inspect})"
    end

    def disconnect
      @connection.close
    end

    def fetch_metadata(**options)
      api_key = Protocol::TOPIC_METADATA_API_KEY
      request = Protocol::TopicMetadataRequest.new(**options)
      response_class = Protocol::MetadataResponse

      response = @connection.request(api_key, request, response_class)

      response.topics.each do |topic|
        Protocol.handle_error(topic.topic_error_code)

        topic.partitions.each do |partition|
          begin
            Protocol.handle_error(partition.partition_error_code)
          rescue ReplicaNotAvailable
            # This error can be safely ignored per the protocol specification.
            @logger.warn "Replica not available for #{topic.topic_name}/#{partition.partition_id}"
          end
        end
      end

      response
    end

    def produce(**options)
      api_key = Protocol::PRODUCE_API_KEY
      request = Protocol::ProduceRequest.new(**options)
      response_class = request.requires_acks? ? Protocol::ProduceResponse : nil

      @connection.request(api_key, request, response_class)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ruby-kafka-0.1.1 lib/kafka/broker.rb