Sha256: c53d37f9d5a1c0aa872ca55ad270fe1d078d4fef1187416b0e6a5fd3ca50b7a5
Contents?: true
Size: 1.31 KB
Versions: 9
Compression:
Stored size: 1.31 KB
Contents
module Racecar class Consumer Subscription = Struct.new(:topic, :start_from_beginning, :max_bytes_per_partition) class << self attr_accessor :max_wait_time attr_accessor :group_id attr_accessor :offset_retention_time def subscriptions @subscriptions ||= [] end # Adds one or more topic subscriptions. # # Can be called multiple times in order to subscribe to more topics. # # @param topics [String] one or more topics to subscribe to. # @param start_from_beginning [Boolean] whether to start from the beginning or the end # of each partition. # @param max_bytes_per_partition [Integer] the maximum number of bytes to fetch from # each partition at a time. # @return [nil] def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576) topics.each do |topic| subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition) end end end def configure(consumer:, producer:) @_consumer = consumer @_producer = producer end def teardown; end protected def heartbeat @_consumer.trigger_heartbeat end def produce(value, **options) @_producer.produce(value, **options) end end end
Version data entries
9 entries across 9 versions & 1 rubygems