Sha256: 71df83b07a186b7926d8898c05e3d820f0bb3209c1da6cbb31e10d9cd32d5246
Contents?: true
Size: 1.88 KB
Versions: 2
Compression:
Stored size: 1.88 KB
Contents
require 'concurrent/maybe' require 'concurrent/channel/selector/after_clause' require 'concurrent/channel/selector/default_clause' require 'concurrent/channel/selector/put_clause' require 'concurrent/channel/selector/take_clause' module Concurrent class Channel # @!visibility private class Selector def initialize @clauses = [] @error_handler = nil end def case(channel, action, message = nil, &block) if [:take, :poll, :receive, :~].include?(action) take(channel, &block) elsif [:put, :offer, :send, :<<].include?(action) put(channel, message, &block) else raise ArgumentError.new('invalid action') end end def take(channel, &block) raise ArgumentError.new('no block given') unless block_given? @clauses << TakeClause.new(channel, block) end alias_method :receive, :take def put(channel, message, &block) @clauses << PutClause.new(channel, message, block) end alias_method :send, :put def after(seconds, &block) @clauses << AfterClause.new(seconds, block) end alias_method :timeout, :after def default(&block) raise ArgumentError.new('no block given') unless block_given? @clauses << DefaultClause.new(block) end def error(&block) raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('only one error handler allowed') if @error_handler @error_handler = block end def execute loop do done = @clauses.each do |clause| result = clause.execute break result if result.just? end break done.value if done.is_a?(Concurrent::Maybe) Thread.pass end rescue => ex @error_handler.call(ex) if @error_handler end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
concurrent-ruby-edge-0.2.0.pre4 | lib/concurrent/channel/selector.rb |
concurrent-ruby-edge-0.2.0.pre3 | lib/concurrent/channel/selector.rb |