Sha256: acef1e8c1ac222223ddcbac1d0ebc96514372517153fc3482857efe12a421195

Contents?: true

Size: 1.67 KB

Versions: 3

Compression:

Stored size: 1.67 KB

Contents

require 'thread'
require 'observer'

require 'concurrent/runnable'

module Concurrent

  # http://www.scala-lang.org/api/current/index.html#scala.actors.Actor
  class Actor
    include Observable
    include Runnable

    def initialize
      @queue = Queue.new
    end

    def post(*message)
      return false unless running?
      @queue.push(message)
      return @queue.length
    end

    def <<(message)
      post(*message)
      return self
    end

    def self.pool(count, &block)
      raise ArgumentError.new('count must be greater than zero') unless count > 0
      mailbox = Queue.new
      actors = count.times.collect do
        actor = self.new(&block)
        actor.instance_variable_set(:@queue, mailbox)
        actor
      end
      return Poolbox.new(mailbox), actors
    end

    protected

    def act(*args)
      raise NotImplementedError.new("#{self.class} does not implement #act")
    end

    class Poolbox

      def initialize(queue)
        @queue = queue
      end

      def post(*message)
        @queue.push(message)
        return @queue.length
      end

      def <<(message)
        post(*message)
        return self
      end
    end

    # @private
    def on_run # :nodoc:
      @queue.clear
    end

    # @private
    def on_stop # :nodoc:
      @queue.clear
      @queue.push(:stop)
    end

    # @private
    def on_task # :nodoc:
      message = @queue.pop
      return if message == :stop
      begin
        result = act(*message)
        changed
        notify_observers(Time.now, message, result)
      rescue => ex
        on_error(Time.now, message, ex)
      end
    end

    # @private
    def on_error(time, msg, ex) # :nodoc:
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
concurrent-ruby-0.3.0 lib/concurrent/actor.rb
concurrent-ruby-0.3.0.pre.3 lib/concurrent/actor.rb
concurrent-ruby-0.3.0.pre.2 lib/concurrent/actor.rb