lib/celluloid/receivers.rb in celluloid-0.6.2 vs lib/celluloid/receivers.rb in celluloid-0.7.0
- old
+ new
@@ -1,36 +1,66 @@
+require 'set'
+
module Celluloid
# Allow methods to directly interact with the actor protocol
class Receivers
def initialize
- @handlers = []
+ @receivers = Set.new
+ @timers = Timers.new
end
# Receive an asynchronous message
- def receive(&block)
- raise ArgumentError, "receive must be given a block" unless block
+ def receive(timeout = nil, &block)
+ receiver = Receiver.new block
- @handlers << [Fiber.current, block]
- Fiber.yield
+ if timeout
+ receiver.timer = @timers.add(timeout) do
+ @receivers.delete receiver
+ receiver.resume
+ end
+ end
+
+ @receivers << receiver
+ Task.suspend
end
+ # How long to wait until the next timer fires
+ def wait_interval
+ @timers.wait_interval
+ end
+
+ # Fire any pending timers
+ def fire_timers
+ @timers.fire
+ end
+
# Handle incoming messages
def handle_message(message)
- handler = nil
+ receiver = @receivers.find { |r| r.match(message) }
+ return unless receiver
- @handlers.each_with_index do |(fiber, block), index|
- if block.call(message)
- handler = index
- break
- end
- end
+ @receivers.delete receiver
+ @timers.cancel receiver.timer if receiver.timer
+ receiver.resume message
+ end
+ end
- if handler
- fiber, _ = @handlers.delete_at handler
- fiber.resume message
- true
- else
- false
- end
+ # Methods blocking on a call to receive
+ class Receiver
+ attr_accessor :timer
+
+ def initialize(block)
+ @block = block
+ @task = Task.current
+ @timer = nil
+ end
+
+ # Match a message with this receiver's block
+ def match(message)
+ @block.call(message) if @block
+ end
+
+ def resume(message = nil)
+ @task.resume message
end
end
end