Sha256: 7a058bc77fe154f52c598e02155966e4c5622988927c71ddff64e3d5353b1291

Contents?: true

Size: 642 Bytes

Versions: 7

Compression:

Stored size: 642 Bytes

Contents

require 'concurrent/channel/waitable_list'

module Concurrent
  module Channel

    # @api Channel
    # @!macro edge_warning
    class UnbufferedChannel

      def initialize
        @probe_set = WaitableList.new
      end

      def probe_set_size
        @probe_set.size
      end

      def push(value)
        until @probe_set.take.try_set([value, self])
        end
      end

      def pop
        probe = Channel::Probe.new
        select(probe)
        probe.value
      end

      def select(probe)
        @probe_set.put(probe)
      end

      def remove_probe(probe)
        @probe_set.delete(probe)
      end

    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
concurrent-ruby-edge-0.1.2 lib/concurrent/channel/unbuffered_channel.rb
concurrent-ruby-edge-0.2.0.pre2 lib/concurrent/channel/unbuffered_channel.rb
concurrent-ruby-edge-0.2.0.pre1 lib/concurrent/channel/unbuffered_channel.rb
concurrent-ruby-edge-0.1.1 lib/concurrent/channel/unbuffered_channel.rb
concurrent-ruby-edge-0.1.0 lib/concurrent/channel/unbuffered_channel.rb
concurrent-ruby-edge-0.1.0.pre3 lib/concurrent/channel/unbuffered_channel.rb
concurrent-ruby-edge-0.1.0.pre2 lib/concurrent/channel/unbuffered_channel.rb