# Author:: Joel Friedman and Patrick Farley
#
# This module is used to simplify concurrency
# in your application. JVM threads and JRetlang are
# used to provide Actor model style asynchronous
# message passing via method calls. Named channel based
# message passing is also supported via +register_channel+ and
# the :channel parameter on +mailslot+.
require 'rubygems'
require 'jretlang'
require File.dirname(__FILE__) + '/synchronized'
module Mailbox
include Synchronized
# Register your jretlang channel as a named channel
def register_channel(channel_name, channel)
channel_registry = self.class.__channel_registry__
channel_registry.select { |k,v| v[:channel] == channel_name }.each do |k,v|
v[:replyable] ? __subscribe_with_single_reply__(channel, k) : __subscribe__(channel, k)
end
end
class << self
# Used to tell +Mailbox+ that all +mailslot+
# methods should be run on the calling thread.
#
# *** Intended for synchronus unit testing of concurrent apps***
attr_accessor :synchronous
end
private
def self.included(base)
base.extend(Mailbox::ClassMethods)
end
def __subscribe__(channel, method)
channel.subscribe_on_fiber(__fiber__) { |*args| self.send(method, *args) }
end
def __subscribe_with_single_reply__(channel, method)
channel.subscribe(__fiber__) do |message|
message.reply(self.send(method))
end
end
def __synchronous_fiber__
executor = JRL::SynchronousDisposingExecutor.new
fiber = JRL::Fibers::ThreadFiber.new executor, "synchronous_thread", true
end
def __started_fiber__
fiber = Mailbox.synchronous == true ? __synchronous_fiber__ : JRL::Fibers::ThreadFiber.new
fiber.start
fiber
end
def __fiber__
@fiber ||= __started_fiber__
end
module ClassMethods
include Synchronized::ClassMethods
# Used within +Mailbox+ module
attr_accessor :__channel_registry__
# Notifies Mailbox that the next method added
# will be a +mailslot+. If :channel is provided
# the next method will become a subscriber on the channel.
# Channel based +mailslot+ methods are also made private
# to discourage direct invocation
def mailslot(params={})
@next_channel_name = params[:channel]
@replyable = params[:replyable]
@mailslot = true
end
private
def method_added(method_name, &block)
return super unless @mailslot == true
@mailslot = false
if @next_channel_name.nil?
__setup_on_fiber__(method_name)
else
__setup_on_channel__(method_name, @replyable)
end
super
end
def __setup_on_fiber__(method_name)
return super if @is_adding_mailbox_to_method
alias_method :"__#{method_name}__", method_name
@is_adding_mailbox_to_method = true
define_method( method_name, lambda do |*args|
__fiber__.execute { self.send(:"__#{method_name}__", *args ) }
end )
@is_adding_mailbox_to_method = false
end
def __setup_on_channel__(method_name, replyable)
private method_name
@__channel_registry__ ||= {}
__channel_registry__[method_name] = { :channel => @next_channel_name, :replyable => replyable }
@replyable = nil
@next_channel_name = nil
end
end
end