Sha256: 17c4dc6f35290177583f078b14a38b971799d534fd4595b6a7d4b0b26f47f52f

Contents?: true

Size: 1.52 KB

Versions: 5

Compression:

Stored size: 1.52 KB

Contents

# frozen_string_literal: true

module Kril
  # High level abstraction for producing records to topics.
  class Producer
    # avro   - Avro instance for deserializing records [AvroTurf::Messaging]
    # kafka  - Kafka instance for creating producers [Kafka]
    # config - producer configuration (optional) [Hash]
    def initialize(avro: nil, kafka: nil, config: {})
      config[:required_acks] ||= 1
      config[:delivery_threshold] ||= 1
      sync_config = config.dup
      @avro = avro
      @async = kafka.async_producer(config)
      sync_config.delete(:delivery_threshold)
      @sync = kafka.producer(sync_config)
    end

    # Commit a record to a topic.
    #
    # record       - record to serialize and commit [String]
    # schema_name  - name of schema to encode record from [String]
    # namespace    - namespace of schema (optional) [String]
    # topic        - name of topic. Will be schema_name if nil (optional) [String]
    # synchronous  - blocks until commit if true (optional) [Boolean]
    def send(record:,
             schema_name:,
             namespace: nil,
             topic: nil,
             syncronous: false)
      topic ||= schema_name
      encoded = @avro.encode(record,
                             schema_name: schema_name,
                             namespace: namespace)
      if syncronous
        @sync.produce(encoded, topic: topic)
        @sync.deliver_messages
      else
        @async.produce(encoded, topic: topic)
      end
    ensure
      @async.shutdown
      @sync.shutdown
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
kril-0.2.4 lib/kril/producer.rb
kril-0.2.3 lib/kril/producer.rb
kril-0.2.2 lib/kril/producer.rb
kril-0.2.1 lib/kril/producer.rb
kril-0.2.0 lib/kril/producer.rb