Sha256: d55050e03b2045252dfd5ee4ba31002fbe029e93dd05be604d41bcf45389a13c

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

module PgConduit
  # A multi threaded stream reader
  class ParallelStreamReader
    # @param query_stream [PgConduit::QueryStream]
    # @param threads [Integer] The number of threads to use for workers
    # @param queue_max_size [Integer] How many rows should be stored in memory
    #   in the work queue.
    def initialize(query_stream, threads: 5, queue_max_size: 1000)
      @queue = SizedQueue.new(queue_max_size)
      @workers = threads
      @stream = query_stream
    end

    # Read A QueryStream and yield it's rows
    #
    # @yield [Hash] A single row from the QueryStream. Every row from the stream
    #   will be yielded but order is not guaranteed.
    def read(&callback)
      reader  = read_stream(@stream)
      workers = dispatch_workers(&callback)
      reader.join
      workers.each { |t| t.join }
      :ok
    end

    private

    def read_stream(query_stream)
      Thread.new do
        query_stream.each_row { |row| @queue << row }
        @queue.close
      end
    end

    def dispatch_workers(&callback)
      (1..@workers).to_a.map { dispatch_worker(&callback) }
    end

    def dispatch_worker(&callback)
      Thread.new do
        loop do
          continue = process_next(&callback)
          break if @queue.closed? && !continue
        end
      end
    end

    def process_next(&callback)
      continue = false
      Thread.new do
        row = @queue.deq
        if row
          callback.call row
          continue = true
        end
      end.join
      continue
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pg_conduit-0.1.0 lib/pg_conduit/parallel_stream_reader.rb