Sha256: acef1e8c1ac222223ddcbac1d0ebc96514372517153fc3482857efe12a421195
Contents?: true
Size: 1.67 KB
Versions: 3
Compression:
Stored size: 1.67 KB
Contents
require 'thread' require 'observer' require 'concurrent/runnable' module Concurrent # http://www.scala-lang.org/api/current/index.html#scala.actors.Actor class Actor include Observable include Runnable def initialize @queue = Queue.new end def post(*message) return false unless running? @queue.push(message) return @queue.length end def <<(message) post(*message) return self end def self.pool(count, &block) raise ArgumentError.new('count must be greater than zero') unless count > 0 mailbox = Queue.new actors = count.times.collect do actor = self.new(&block) actor.instance_variable_set(:@queue, mailbox) actor end return Poolbox.new(mailbox), actors end protected def act(*args) raise NotImplementedError.new("#{self.class} does not implement #act") end class Poolbox def initialize(queue) @queue = queue end def post(*message) @queue.push(message) return @queue.length end def <<(message) post(*message) return self end end # @private def on_run # :nodoc: @queue.clear end # @private def on_stop # :nodoc: @queue.clear @queue.push(:stop) end # @private def on_task # :nodoc: message = @queue.pop return if message == :stop begin result = act(*message) changed notify_observers(Time.now, message, result) rescue => ex on_error(Time.now, message, ex) end end # @private def on_error(time, msg, ex) # :nodoc: end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
concurrent-ruby-0.3.0 | lib/concurrent/actor.rb |
concurrent-ruby-0.3.0.pre.3 | lib/concurrent/actor.rb |
concurrent-ruby-0.3.0.pre.2 | lib/concurrent/actor.rb |