Sha256: 5d43f67ed652e6f98ae272c8b9ed6ba69ad572c2466d5b7b3b3fe16ea57cc028
Contents?: true
Size: 779 Bytes
Versions: 5
Compression:
Stored size: 779 Bytes
Contents
module Telekinesis module Consumer # A RecordProcessor that uses the given block to process records. Useful to # quickly define a consumer. # # Telekinesis::Consumer::Worker.new(stream: 'my-stream', app: 'tail') do # Telekinesis::Consumer::Block.new do |records, checkpointer, millis_behind_latest| # records.each {|r| puts r} # $stderr.puts "#{millis_behind_latest} ms behind" # checkpointer.checkpoint # end # end class Block < BaseProcessor def initialize(&block) raise ArgumentError, "No block given" unless block_given? @block = block end def process_records(input) @block.call(input.records, input.checkpointer, input.millis_behind_latest) end end end end
Version data entries
5 entries across 5 versions & 1 rubygems