lib/concurrent/channel.rb in concurrent-ruby-edge-0.2.0.pre2 vs lib/concurrent/channel.rb in concurrent-ruby-edge-0.2.0.pre3

- old
+ new

@@ -1,6 +1,272 @@ -require 'concurrent/channel/blocking_ring_buffer' -require 'concurrent/channel/buffered_channel' -require 'concurrent/channel/channel' -require 'concurrent/channel/ring_buffer' -require 'concurrent/channel/unbuffered_channel' -require 'concurrent/channel/waitable_list' +require 'concurrent/channel/buffer' +require 'concurrent/channel/selector' + +require 'concurrent/maybe' +require 'concurrent/executor/cached_thread_pool' + +module Concurrent + + # {include:file:doc/channel.md} + class Channel + include Enumerable + + GOROUTINES = Concurrent::CachedThreadPool.new + private_constant :GOROUTINES + + BUFFER_TYPES = { + unbuffered: Buffer::Unbuffered, + buffered: Buffer::Buffered, + dropping: Buffer::Dropping, + sliding: Buffer::Sliding + }.freeze + private_constant :BUFFER_TYPES + + DEFAULT_VALIDATOR = ->(value){ true } + private_constant :DEFAULT_VALIDATOR + + Error = Class.new(StandardError) + + class ValidationError < Error + def initialize(message = nil) + message ||= 'invalid value' + end + end + + def initialize(opts = {}) + # undocumented -- for internal use only + if opts.is_a? Buffer::Base + @buffer = opts + return + end + + size = opts[:size] + buffer = opts[:buffer] + + if size && buffer == :unbuffered + raise ArgumentError.new('unbuffered channels cannot have a size') + elsif size.nil? && buffer.nil? + @buffer = BUFFER_TYPES[:unbuffered].new + elsif size == 0 && buffer == :buffered + @buffer = BUFFER_TYPES[:unbuffered].new + elsif buffer == :unbuffered + @buffer = BUFFER_TYPES[:unbuffered].new + elsif size.nil? || size < 1 + raise ArgumentError.new('size must be at least 1 for this buffer type') + else + buffer ||= :buffered + @buffer = BUFFER_TYPES[buffer].new(size) + end + + @validator = opts.fetch(:validator, DEFAULT_VALIDATOR) + end + + def size + @buffer.size + end + alias_method :capacity, :size + + def put(item) + return false unless validate(item, false, false) + do_put(item) + end + alias_method :send, :put + alias_method :<<, :put + + def put!(item) + validate(item, false, true) + ok = do_put(item) + raise Error if !ok + ok + end + + def put?(item) + if !validate(item, true, false) + Concurrent::Maybe.nothing('invalid value') + elsif do_put(item) + Concurrent::Maybe.just(true) + else + Concurrent::Maybe.nothing + end + end + + def offer(item) + return false unless validate(item, false, false) + do_offer(item) + end + + def offer!(item) + validate(item, false, true) + ok = do_offer(item) + raise Error if !ok + ok + end + + def offer?(item) + if !validate(item, true, false) + Concurrent::Maybe.nothing('invalid value') + elsif do_offer(item) + Concurrent::Maybe.just(true) + else + Concurrent::Maybe.nothing + end + end + + def take + item, _ = self.next + item + end + alias_method :receive, :take + alias_method :~, :take + + def take! + item, _ = do_next + raise Error if item == Buffer::NO_VALUE + item + end + + def take? + item, _ = self.next? + item + end + + # + # @example + # + # jobs = Channel.new + # + # Channel.go do + # loop do + # j, more = jobs.next + # if more + # print "received job #{j}\n" + # else + # print "received all jobs\n" + # break + # end + # end + # end + def next + item, more = do_next + item = nil if item == Buffer::NO_VALUE + return item, more + end + + def next? + item, more = do_next + item = if item == Buffer::NO_VALUE + Concurrent::Maybe.nothing + else + Concurrent::Maybe.just(item) + end + return item, more + end + + def poll + (item = do_poll) == Buffer::NO_VALUE ? nil : item + end + + def poll! + item = do_poll + raise Error if item == Buffer::NO_VALUE + item + end + + def poll? + if (item = do_poll) == Buffer::NO_VALUE + Concurrent::Maybe.nothing + else + Concurrent::Maybe.just(item) + end + end + + def each + raise ArgumentError.new('no block given') unless block_given? + loop do + item, more = do_next + if item != Buffer::NO_VALUE + yield(item) + elsif !more + break + end + end + end + + def close + @buffer.close + end + alias_method :stop, :close + + class << self + def timer(seconds) + Channel.new(Buffer::Timer.new(seconds)) + end + alias_method :after, :timer + + def ticker(interval) + Channel.new(Buffer::Ticker.new(interval)) + end + alias_method :tick, :ticker + + def select(*args) + raise ArgumentError.new('no block given') unless block_given? + selector = Selector.new + yield(selector, *args) + selector.execute + end + alias_method :alt, :select + + def go(*args, &block) + go_via(GOROUTINES, *args, &block) + end + + def go_via(executor, *args, &block) + raise ArgumentError.new('no block given') unless block_given? + executor.post(*args, &block) + end + + def go_loop(*args, &block) + go_loop_via(GOROUTINES, *args, &block) + end + + def go_loop_via(executor, *args, &block) + raise ArgumentError.new('no block given') unless block_given? + executor.post(block, *args) do + loop do + break unless block.call(*args) + end + end + end + end + + private + + def validate(value, allow_nil, raise_error) + if !allow_nil && value.nil? + raise_error ? raise(ValidationError.new('nil is not a valid value')) : false + elsif !@validator.call(value) + raise_error ? raise(ValidationError) : false + else + true + end + rescue => ex + # the validator raised an exception + return raise_error ? raise(ex) : false + end + + def do_put(item) + @buffer.put(item) + end + + def do_offer(item) + @buffer.offer(item) + end + + def do_next + @buffer.next + end + + def do_poll + @buffer.poll + end + end +end