Sha256: c2cde03b7ccd95123bf9d49b50759b7673faf23bb668ebacc75545b13089f389

Contents?: true

Size: 1.74 KB

Versions: 1

Compression:

Stored size: 1.74 KB

Contents

require "socket"
require "kafka/protocol/request_message"
require "kafka/protocol/encoder"
require "kafka/protocol/decoder"

module Kafka
  ConnectionError = Class.new(StandardError)

  class Connection
    def initialize(host:, port:, client_id:, logger:)
      @host = host
      @port = port
      @client_id = client_id
      @logger = logger
      @socket = nil
      @correlation_id = 0
      @socket_timeout = 1000
    end

    def open
      @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..."

      @socket = TCPSocket.new(@host, @port)
    rescue SocketError => e
      @logger.error "Failed to connect to #{@host}:#{@port}: #{e}"

      raise ConnectionError, e
    end

    def write_request(api_key, request)
      @correlation_id += 1

      message = Kafka::Protocol::RequestMessage.new(
        api_key: api_key,
        api_version: 0,
        correlation_id: @correlation_id,
        client_id: @client_id,
        request: request,
      )

      buffer = StringIO.new
      message_encoder = Kafka::Protocol::Encoder.new(buffer)
      message.encode(message_encoder)

      @logger.info "Sending request #{@correlation_id} (#{request.class})..."

      connection_encoder = Kafka::Protocol::Encoder.new(@socket)
      connection_encoder.write_bytes(buffer.string)
    end

    def read_response(response)
      @logger.info "Reading response #{response.class}"

      connection_decoder = Kafka::Protocol::Decoder.new(@socket)
      bytes = connection_decoder.bytes

      buffer = StringIO.new(bytes)
      response_decoder = Kafka::Protocol::Decoder.new(buffer)

      correlation_id = response_decoder.int32

      @logger.info "Correlation id #{correlation_id}"

      response.decode(response_decoder)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ruby-kafka-0.1.0.pre.alpha lib/kafka/connection.rb