Sha256: 75b578f27a4e5595f88b8a64898b21bea5bfd2a852c981e4119901ae1fd0b7d7

Contents?: true

Size: 938 Bytes

Versions: 1

Compression:

Stored size: 938 Bytes

Contents

# frozen_string_literal: true

require 'ddbuffer/version'

class DDBuffer
  module Impl
    STOP_OK = Object.new
    STOP_ERR = Object.new

    def self.gen_collector_thread(enum, queue)
      Thread.new do
        begin
          enum.each { |e| queue << e }
          queue << STOP_OK
        rescue StandardError => e
          queue << STOP_ERR
          queue << e
        end
      end
    end

    def self.gen_enumerator(queue, collector_thread)
      Enumerator.new do |y|
        loop do
          e = queue.pop

          if STOP_OK.equal?(e)
            break
          elsif STOP_ERR.equal?(e)
            raise queue.pop
          end

          y << e
        end
        collector_thread.join
      end.lazy
    end
  end

  def initialize(size)
    @size = size
  end

  def call(enum)
    queue = SizedQueue.new(@size)
    thread = Impl.gen_collector_thread(enum, queue)
    Impl.gen_enumerator(queue, thread)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ddbuffer-1.0.0 lib/ddbuffer.rb