# Author:: Joel Friedman and Patrick Farley # # This module is used to simplify concurrency # in your application. JVM threads and JRetlang are # used to provide Actor model style asynchronous # message passing via method calls. Named channel based # message passing is also supported via +register_channel+ and # the :channel parameter on +mailslot+. require 'rubygems' require 'jretlang' require File.dirname(__FILE__) + '/synchronized' require File.dirname(__FILE__) + '/daemon_thread_factory' module Mailbox include Synchronized # Register your jretlang channel as a named channel def register_channel(channel_name, channel) channel_registry = self.class.__channel_registry__ channel_registry.select { |k,v| v[:channel] == channel_name }.each do |k,v| v[:replyable] ? __subscribe_with_single_reply__(channel, k) : __subscribe__(channel, k) end end def verbose_output_to method_name @__verbose_target__ = method_name end class << self # Used to tell +Mailbox+ that all +mailslot+ # methods should be run on the calling thread. # # *** Intended for synchronous unit testing of concurrent apps*** attr_accessor :synchronous end private def self.included(base) base.extend(Mailbox::ClassMethods) end def __subscribe__(channel, method) channel.subscribe_on_fiber(__fiber__) do |*args| self.send(method, *args) end end def __subscribe_with_single_reply__(channel, method) channel.subscribe(__fiber__) do |message| message.reply(self.send(method)) end end def __synchronous_fiber__ executor = JRL::SynchronousDisposingExecutor.new JRL::Fibers::ThreadFiber.new executor, "#{self.class.name} #{self.object_id} Mailbox synchronous", true end def __create_fiber__ return self.class.__fiber_factory__.create if self.class.__fiber_factory__ JRL::Fibers::ThreadFiber.new( JRL::RunnableExecutorImpl.new, "#{self.class.name} #{self.object_id} Mailbox", true ) end def __started_fiber__ fiber = Mailbox.synchronous == true ? __synchronous_fiber__ : __create_fiber__ fiber.start fiber end def __fiber__ @fiber ||= __started_fiber__ end module ClassMethods include Synchronized::ClassMethods # Used within +Mailbox+ module attr_accessor :__channel_registry__ # Notifies Mailbox that the next method added # will be a +mailslot+. If :channel is provided # the next method will become a subscriber on the channel. # Channel based +mailslot+ methods are also made private # to discourage direct invocation. :exception # can be provided as the symbol for a method to handle # any exceptions that occur in your +mailslot+. This # method will be passed the exception that was raised def mailslot(params={}) @next_channel_name = params[:channel] @replyable = params[:replyable] @timeout = params[:timeout].nil? ? -1 : params[:timeout] * 1000 @exception = params[:exception] @mailslot = true end def mailbox_thread_pool_size(count) @__fiber_factory__ = JRL::Fibers::PoolFiberFactory.new(JRL::Concurrent::Executors.new_fixed_thread_pool(count, DaemonThreadFactory.new)) end def __fiber_factory__ @__fiber_factory__ ||= nil end private def method_added(method_name, &block) return super unless __mailslot__ == true @mailslot = false if @next_channel_name.nil? __setup_on_fiber__(method_name, @replyable, @timeout) else __setup_on_channel__(method_name, @replyable) end super end def __setup_on_fiber__(method_name, replyable, timeout) return super if __is_adding_mailbox_to_method__ alias_method :"__#{method_name}__", method_name @is_adding_mailbox_to_method = true exception_method, @exception = @exception, nil define_method method_name do |*args| self.send(@__verbose_target__, "enqueued #{method_name}") if defined? @__verbose_target__ result = nil latch = JRL::Concurrent::CountDownLatch.new(1) if replyable __fiber__.execute do begin self.send(@__verbose_target__, "dequeued #{method_name}") if defined? @__verbose_target__ result = self.send(:"__#{method_name}__", *args ) rescue Exception => ex raise if exception_method.nil? self.send(:"#{exception_method}", ex) ensure latch.count_down if replyable end end is_timeout = false if replyable if timeout == -1 latch.await else is_timeout = !(latch.await timeout, JRL::Concurrent::TimeUnit::MILLISECONDS) end end raise Exception.new("#{method_name} message timeout after #{timeout/1000} seconds") if is_timeout return result end @replyable = false @is_adding_mailbox_to_method = false end def __setup_on_channel__(method_name, replyable) private method_name @__channel_registry__ ||= {} __channel_registry__[method_name] = { :channel => @next_channel_name, :replyable => replyable } @replyable = nil @next_channel_name = nil end def __mailslot__ @mailslot ||= false end def __is_adding_mailbox_to_method__ @is_adding_mailbox_to_method ||= false end end end