lib/kril/producer.rb in kril-0.1.0 vs lib/kril/producer.rb in kril-0.1.1

- old
+ new

@@ -1,25 +1,34 @@ # frozen_string_literal: true module Kril - # Produces records to Kafka + # High level abstraction for producing records to topics. class Producer + # avro - Avro instance for deserializing records [AvroTurf::Messaging] + # kafka - Kafka instance for creating producers [Kafka] + # config - producer configuration (optional) [Hash] def initialize(avro: nil, kafka: nil, config: {}) config[:required_acks] ||= 1 config[:delivery_threshold] ||= 1 sync_config = config.dup @avro = avro @async = kafka.async_producer(config) sync_config.delete(:delivery_threshold) @sync = kafka.producer(sync_config) end + # Commit a record to a topic. + # + # record - record to serialize and commit [String] + # schema_name - name of schema to encode record from [String] + # topic - name of topic. Will be schema_name if nil (optional) [String] + # synchronous - blocks until commit if true (optional) [Boolean] def send(record:, schema_name:, topic: nil, syncronous: false) topic ||= schema_name encoded = @avro.encode(record, schema_name: schema_name) if syncronous - @producer.produce(encoded, topic: topic) - @producer.deliver_messages + @sync.produce(encoded, topic: topic) + @sync.deliver_messages else @async.produce(encoded, topic: topic) end ensure @async.shutdown