Sha256: 17c4dc6f35290177583f078b14a38b971799d534fd4595b6a7d4b0b26f47f52f
Contents?: true
Size: 1.52 KB
Versions: 5
Compression:
Stored size: 1.52 KB
Contents
# frozen_string_literal: true module Kril # High level abstraction for producing records to topics. class Producer # avro - Avro instance for deserializing records [AvroTurf::Messaging] # kafka - Kafka instance for creating producers [Kafka] # config - producer configuration (optional) [Hash] def initialize(avro: nil, kafka: nil, config: {}) config[:required_acks] ||= 1 config[:delivery_threshold] ||= 1 sync_config = config.dup @avro = avro @async = kafka.async_producer(config) sync_config.delete(:delivery_threshold) @sync = kafka.producer(sync_config) end # Commit a record to a topic. # # record - record to serialize and commit [String] # schema_name - name of schema to encode record from [String] # namespace - namespace of schema (optional) [String] # topic - name of topic. Will be schema_name if nil (optional) [String] # synchronous - blocks until commit if true (optional) [Boolean] def send(record:, schema_name:, namespace: nil, topic: nil, syncronous: false) topic ||= schema_name encoded = @avro.encode(record, schema_name: schema_name, namespace: namespace) if syncronous @sync.produce(encoded, topic: topic) @sync.deliver_messages else @async.produce(encoded, topic: topic) end ensure @async.shutdown @sync.shutdown end end end
Version data entries
5 entries across 5 versions & 1 rubygems
Version | Path |
---|---|
kril-0.2.4 | lib/kril/producer.rb |
kril-0.2.3 | lib/kril/producer.rb |
kril-0.2.2 | lib/kril/producer.rb |
kril-0.2.1 | lib/kril/producer.rb |
kril-0.2.0 | lib/kril/producer.rb |