lib/concurrent/channel.rb in concurrent-ruby-edge-0.2.0.pre2 vs lib/concurrent/channel.rb in concurrent-ruby-edge-0.2.0.pre3
- old
+ new
@@ -1,6 +1,272 @@
-require 'concurrent/channel/blocking_ring_buffer'
-require 'concurrent/channel/buffered_channel'
-require 'concurrent/channel/channel'
-require 'concurrent/channel/ring_buffer'
-require 'concurrent/channel/unbuffered_channel'
-require 'concurrent/channel/waitable_list'
+require 'concurrent/channel/buffer'
+require 'concurrent/channel/selector'
+
+require 'concurrent/maybe'
+require 'concurrent/executor/cached_thread_pool'
+
+module Concurrent
+
+ # {include:file:doc/channel.md}
+ class Channel
+ include Enumerable
+
+ GOROUTINES = Concurrent::CachedThreadPool.new
+ private_constant :GOROUTINES
+
+ BUFFER_TYPES = {
+ unbuffered: Buffer::Unbuffered,
+ buffered: Buffer::Buffered,
+ dropping: Buffer::Dropping,
+ sliding: Buffer::Sliding
+ }.freeze
+ private_constant :BUFFER_TYPES
+
+ DEFAULT_VALIDATOR = ->(value){ true }
+ private_constant :DEFAULT_VALIDATOR
+
+ Error = Class.new(StandardError)
+
+ class ValidationError < Error
+ def initialize(message = nil)
+ message ||= 'invalid value'
+ end
+ end
+
+ def initialize(opts = {})
+ # undocumented -- for internal use only
+ if opts.is_a? Buffer::Base
+ @buffer = opts
+ return
+ end
+
+ size = opts[:size]
+ buffer = opts[:buffer]
+
+ if size && buffer == :unbuffered
+ raise ArgumentError.new('unbuffered channels cannot have a size')
+ elsif size.nil? && buffer.nil?
+ @buffer = BUFFER_TYPES[:unbuffered].new
+ elsif size == 0 && buffer == :buffered
+ @buffer = BUFFER_TYPES[:unbuffered].new
+ elsif buffer == :unbuffered
+ @buffer = BUFFER_TYPES[:unbuffered].new
+ elsif size.nil? || size < 1
+ raise ArgumentError.new('size must be at least 1 for this buffer type')
+ else
+ buffer ||= :buffered
+ @buffer = BUFFER_TYPES[buffer].new(size)
+ end
+
+ @validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
+ end
+
+ def size
+ @buffer.size
+ end
+ alias_method :capacity, :size
+
+ def put(item)
+ return false unless validate(item, false, false)
+ do_put(item)
+ end
+ alias_method :send, :put
+ alias_method :<<, :put
+
+ def put!(item)
+ validate(item, false, true)
+ ok = do_put(item)
+ raise Error if !ok
+ ok
+ end
+
+ def put?(item)
+ if !validate(item, true, false)
+ Concurrent::Maybe.nothing('invalid value')
+ elsif do_put(item)
+ Concurrent::Maybe.just(true)
+ else
+ Concurrent::Maybe.nothing
+ end
+ end
+
+ def offer(item)
+ return false unless validate(item, false, false)
+ do_offer(item)
+ end
+
+ def offer!(item)
+ validate(item, false, true)
+ ok = do_offer(item)
+ raise Error if !ok
+ ok
+ end
+
+ def offer?(item)
+ if !validate(item, true, false)
+ Concurrent::Maybe.nothing('invalid value')
+ elsif do_offer(item)
+ Concurrent::Maybe.just(true)
+ else
+ Concurrent::Maybe.nothing
+ end
+ end
+
+ def take
+ item, _ = self.next
+ item
+ end
+ alias_method :receive, :take
+ alias_method :~, :take
+
+ def take!
+ item, _ = do_next
+ raise Error if item == Buffer::NO_VALUE
+ item
+ end
+
+ def take?
+ item, _ = self.next?
+ item
+ end
+
+ #
+ # @example
+ #
+ # jobs = Channel.new
+ #
+ # Channel.go do
+ # loop do
+ # j, more = jobs.next
+ # if more
+ # print "received job #{j}\n"
+ # else
+ # print "received all jobs\n"
+ # break
+ # end
+ # end
+ # end
+ def next
+ item, more = do_next
+ item = nil if item == Buffer::NO_VALUE
+ return item, more
+ end
+
+ def next?
+ item, more = do_next
+ item = if item == Buffer::NO_VALUE
+ Concurrent::Maybe.nothing
+ else
+ Concurrent::Maybe.just(item)
+ end
+ return item, more
+ end
+
+ def poll
+ (item = do_poll) == Buffer::NO_VALUE ? nil : item
+ end
+
+ def poll!
+ item = do_poll
+ raise Error if item == Buffer::NO_VALUE
+ item
+ end
+
+ def poll?
+ if (item = do_poll) == Buffer::NO_VALUE
+ Concurrent::Maybe.nothing
+ else
+ Concurrent::Maybe.just(item)
+ end
+ end
+
+ def each
+ raise ArgumentError.new('no block given') unless block_given?
+ loop do
+ item, more = do_next
+ if item != Buffer::NO_VALUE
+ yield(item)
+ elsif !more
+ break
+ end
+ end
+ end
+
+ def close
+ @buffer.close
+ end
+ alias_method :stop, :close
+
+ class << self
+ def timer(seconds)
+ Channel.new(Buffer::Timer.new(seconds))
+ end
+ alias_method :after, :timer
+
+ def ticker(interval)
+ Channel.new(Buffer::Ticker.new(interval))
+ end
+ alias_method :tick, :ticker
+
+ def select(*args)
+ raise ArgumentError.new('no block given') unless block_given?
+ selector = Selector.new
+ yield(selector, *args)
+ selector.execute
+ end
+ alias_method :alt, :select
+
+ def go(*args, &block)
+ go_via(GOROUTINES, *args, &block)
+ end
+
+ def go_via(executor, *args, &block)
+ raise ArgumentError.new('no block given') unless block_given?
+ executor.post(*args, &block)
+ end
+
+ def go_loop(*args, &block)
+ go_loop_via(GOROUTINES, *args, &block)
+ end
+
+ def go_loop_via(executor, *args, &block)
+ raise ArgumentError.new('no block given') unless block_given?
+ executor.post(block, *args) do
+ loop do
+ break unless block.call(*args)
+ end
+ end
+ end
+ end
+
+ private
+
+ def validate(value, allow_nil, raise_error)
+ if !allow_nil && value.nil?
+ raise_error ? raise(ValidationError.new('nil is not a valid value')) : false
+ elsif !@validator.call(value)
+ raise_error ? raise(ValidationError) : false
+ else
+ true
+ end
+ rescue => ex
+ # the validator raised an exception
+ return raise_error ? raise(ex) : false
+ end
+
+ def do_put(item)
+ @buffer.put(item)
+ end
+
+ def do_offer(item)
+ @buffer.offer(item)
+ end
+
+ def do_next
+ @buffer.next
+ end
+
+ def do_poll
+ @buffer.poll
+ end
+ end
+end