Sha256: c53d37f9d5a1c0aa872ca55ad270fe1d078d4fef1187416b0e6a5fd3ca50b7a5

Contents?: true

Size: 1.31 KB

Versions: 9

Compression:

Stored size: 1.31 KB

Contents

module Racecar
  class Consumer
    Subscription = Struct.new(:topic, :start_from_beginning, :max_bytes_per_partition)

    class << self
      attr_accessor :max_wait_time
      attr_accessor :group_id
      attr_accessor :offset_retention_time

      def subscriptions
        @subscriptions ||= []
      end

      # Adds one or more topic subscriptions.
      #
      # Can be called multiple times in order to subscribe to more topics.
      #
      # @param topics [String] one or more topics to subscribe to.
      # @param start_from_beginning [Boolean] whether to start from the beginning or the end
      #   of each partition.
      # @param max_bytes_per_partition [Integer] the maximum number of bytes to fetch from
      #   each partition at a time.
      # @return [nil]
      def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576)
        topics.each do |topic|
          subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition)
        end
      end
    end

    def configure(consumer:, producer:)
      @_consumer = consumer
      @_producer = producer
    end

    def teardown; end

    protected

    def heartbeat
      @_consumer.trigger_heartbeat
    end

    def produce(value, **options)
      @_producer.produce(value, **options)
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
racecar-1.3.0 lib/racecar/consumer.rb
racecar-1.2.1 lib/racecar/consumer.rb
racecar-1.2.0 lib/racecar/consumer.rb
racecar-1.1.0 lib/racecar/consumer.rb
racecar-1.0.1 lib/racecar/consumer.rb
racecar-1.0.0 lib/racecar/consumer.rb
racecar-0.5.0 lib/racecar/consumer.rb
racecar-0.5.0.beta2 lib/racecar/consumer.rb
racecar-0.5.0.beta1 lib/racecar/consumer.rb