Sha256: 30e0f4b5e9c20c34c4f390f304bb649cec7c2a89e921b51fe1d08d6745df4e9f

Contents?: true

Size: 654 Bytes

Versions: 2

Compression:

Stored size: 654 Bytes

Contents

module Rodimus

  class BufferedStep < Step
    # The maximum size of the buffer
    attr_accessor :buffer_size
    attr_reader   :buffer

    def initialize(buffer_size = 100)
      super()
      @buffer_size = buffer_size
      @buffer      = []
    end

    def close_descriptors # override
      flush if buffer.any?
      super
    end

    def handle_output(transformed_row) # override
      buffer << transformed_row

      if buffer.length >= buffer_size
        flush
      end
    end

    # Flush the contents of the buffer to the outgoing data stream
    def flush
      outgoing.puts(buffer.join("\n"))
      @buffer = []
    end
  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rodimus-1.3.1 lib/rodimus/buffered_step.rb
rodimus-1.3.0 lib/rodimus/buffered_step.rb