Sha256: 74da6a35202522591824d2bd22bdbed70732fa829c52dc20e6a58ab612e7b855

Contents?: true

Size: 876 Bytes

Versions: 1

Compression:

Stored size: 876 Bytes

Contents

module PgStream
  class Processor
    CALLBACK_TYPES = [:before_execute, :during_execute, :after_execute]

    def initialize(stream)
      @stream = stream
      @callbacks = CALLBACK_TYPES.map do |type|
        [type, []]
      end.to_h
      @row_count = 0
    end

    def register(args)
      args.each do |type, function|
        if CALLBACK_TYPES.include?(type)
          @callbacks[type] << function
        else
          raise "#{type} is not an acceptable callback type. Types include #{CALLBACK_TYPES.join(', ')}"
        end
      end
      @callbacks
    end

    def execute
      @callbacks[:before_execute].each(&:call)
      @stream.each_row do |row|
        @row_count += 1
        @callbacks[:during_execute].each { |y| y.call(row, @row_count) }
      end
      @callbacks[:after_execute].each { |y| y.call(@row_count) }
      @row_count
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pg_stream-0.1.0 lib/pg_stream/processor.rb