Sha256: dedf6157366431bf7ab1a4584fa2dc2dd7d472f01941a8e061fe301b32083a1b

Contents?: true

Size: 1.72 KB

Versions: 5

Compression:

Stored size: 1.72 KB

Contents

require 'concurrent/channel/waitable_list'

module Concurrent
  module Channel

    # @api Channel
    # @!macro edge_warning
    class BufferedChannel

      def initialize(size)
        @mutex = Mutex.new
        @buffer_condition = ConditionVariable.new

        @probe_set = WaitableList.new
        @buffer = RingBuffer.new(size)
      end

      def probe_set_size
        @probe_set.size
      end

      def buffer_queue_size
        @mutex.synchronize { @buffer.count }
      end

      def push(value)
        until set_probe_or_push_into_buffer(value)
        end
      end

      def pop
        probe = Channel::Probe.new
        select(probe)
        probe.value
      end

      def select(probe)
        @mutex.synchronize do

          if @buffer.empty?
            @probe_set.put(probe)
            true
          else
            shift_buffer if probe.try_set([peek_buffer, self])
          end

        end
      end

      def remove_probe(probe)
        @probe_set.delete(probe)
      end

      private

      def push_into_buffer(value)
        @buffer_condition.wait(@mutex) while @buffer.full?
        @buffer.offer value
        @buffer_condition.broadcast
      end

      def peek_buffer
        @buffer_condition.wait(@mutex) while @buffer.empty?
        @buffer.peek
      end

      def shift_buffer
        @buffer_condition.wait(@mutex) while @buffer.empty?
        result = @buffer.poll
        @buffer_condition.broadcast
        result
      end

      def set_probe_or_push_into_buffer(value)
        @mutex.synchronize do
          if @probe_set.empty?
            push_into_buffer(value)
            true
          else
            @probe_set.take.try_set([value, self])
          end
        end
      end

    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
concurrent-ruby-edge-0.1.2 lib/concurrent/channel/buffered_channel.rb
concurrent-ruby-edge-0.1.1 lib/concurrent/channel/buffered_channel.rb
concurrent-ruby-edge-0.1.0 lib/concurrent/channel/buffered_channel.rb
concurrent-ruby-edge-0.1.0.pre3 lib/concurrent/channel/buffered_channel.rb
concurrent-ruby-edge-0.1.0.pre2 lib/concurrent/channel/buffered_channel.rb