Sha256: 5d43f67ed652e6f98ae272c8b9ed6ba69ad572c2466d5b7b3b3fe16ea57cc028

Contents?: true

Size: 779 Bytes

Versions: 5

Compression:

Stored size: 779 Bytes

Contents

module Telekinesis
  module Consumer
    # A RecordProcessor that uses the given block to process records. Useful to
    # quickly define a consumer.
    #
    # Telekinesis::Consumer::Worker.new(stream: 'my-stream', app: 'tail') do
    #   Telekinesis::Consumer::Block.new do |records, checkpointer, millis_behind_latest|
    #     records.each {|r| puts r}
    #     $stderr.puts "#{millis_behind_latest} ms behind"
    #     checkpointer.checkpoint
    #   end
    # end
    class Block < BaseProcessor
      def initialize(&block)
        raise ArgumentError, "No block given" unless block_given?
        @block = block
      end

      def process_records(input)
        @block.call(input.records, input.checkpointer, input.millis_behind_latest)
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
telekinesis-3.2.1-java lib/telekinesis/consumer/block.rb
telekinesis-3.2.0-java lib/telekinesis/consumer/block.rb
telekinesis-3.1.1-java lib/telekinesis/consumer/block.rb
telekinesis-3.1.0-java lib/telekinesis/consumer/block.rb
telekinesis-3.0.0-java lib/telekinesis/consumer/block.rb