# Copyright: Copyright (c) 2004 Nicolas Despres. All rights reserved. # Author: Nicolas Despres . # License: Gnu General Public License. # $LastChangedBy: ertai $ # $Id: dispatcher.rb 186 2005-04-03 00:07:45Z ertai $ require 'drb/drb' module DRb class Dispatcher URIS = [] MAX_JOB = 5 MAX_JOB_LIMIT = 100 DRB_OBJECT_CLASS = DRbObject attr_reader :max_job def initialize(uris=URIS, max_job=MAX_JOB, drb_object_class=DRB_OBJECT_CLASS, *args, &block) unless 1 <= max_job && max_job <= MAX_JOB_LIMIT raise(ArgumentError, "`#{max_job}' - too big max job (must be <= #{MAX_JOB_LIMIT})") end @max_job = max_job @uris = {} uris.each { |uri| add(uri, drb_object_class, *args, &block) } end def [](uri) @uris[uri] end def add(uri, drb_object_class, *args, &block) @uris[uri] = drb_object_class.new(nil, uri, *args, &block) end def del(uri) @uris.delete(uri) end def each @uris.each { |uri, ro| yield(uri, ro) } end def dispatch(ros, meth, *args, &block) result = {} nb_threads = [ ros.size, @max_job].min if nb_threads == 1 res = request(ros[0], meth, *args, &block) result.update(res) else mutex = Mutex.new threads = [] ros_q = ros.dup nb_threads.times do threads << Thread.new do loop do ro = mutex.synchronize { ros_q.shift } break if ro.nil? res = request(ro, meth, *args, &block) mutex.synchronize { result.update(res) } end end end threads.each { |thread| thread.join } end result end alias update dispatch def select(&comp) ros = [] each { |uri, ro| ros << ro if comp[ro] } ros end def select_all select { |ro| true } end def select_by(result, meth, *args, &block) select { |ro| test_ro_attribut(ro, result, meth, *args, &block) } end def multi_select_by(hash, init_val=true, &bool_fun) select do |ro| ret = init_val hash.each do |k, v| ret = bool_fun[ret, test_ro_attribut(ro, v, k)] end ret end end def and_multi_select_by(hash) select do |ro| ret = true hash.each do |k, v| ret = (ret and test_ro_attribut(ro, v, k)) end ret end end def or_multi_select_by(hash) select do |ro| ret = false hash.each { |k, v| ret = (ret or test_ro_attribut(ro, v, k)) } ret end end def dispatch_all(meth, *args, &block) dispatch(select_all, meth, *args, &block) end def method_missing(meth, *args, &block) dispatch_all(meth, *args, &block) end def request(ro, meth, *args, &block) res = nil begin res = ro.send(meth, *args, &block) rescue Exception => err res = err end { ro.__drburi => res } end protected def test_ro_attribut(ro, result, meth, *args, &block) res = request(ro, meth, *args, &block) if result.is_a? Regexp res[ro.__drburi].to_s =~ result else res[ro.__drburi] == result end end end # class Dispatcher end # module DRb if defined? TEST_MODE or $0 == __FILE__ require 'test/unit/ui/yaml/testrunner' require 'rbconfig' require 'drb/session_manager' module DRb class DispatcherTest < ::Test::Unit::TestCase class MyDRbObject < DRbObject attr_reader :uri, :hostname, :port, :protocol def initialize(obj, uri=nil) super if uri =~ /(^.*?):\/\/(.*?):(\d*)$/ @protocol = $1 @hostname = $2 @port = $3.to_i end end end # # Tests # def test_multi_select_by n = 5 uris = [] for i in 0...n do uris << "druby://localhost:#{42000 + i * 100}" end d = nil assert_nothing_raised { d = Dispatcher.new(uris, 1, MyDRbObject) } assert_not_nil(d) h1 = { :port => 42000, :hostname => 'localhost' } r1 = d.and_multi_select_by(h1) assert_equal(1, r1.size) assert_equal(d['druby://localhost:42000'], r1[0]) r2 = d.multi_select_by(h1) { |a, b| (a and b) } assert_equal(1, r2.size) assert_equal(r1, r2) r1 = d.or_multi_select_by(h1) r2 = d.multi_select_by(h1, false) { |a, b| (a or b) } assert_equal(r1, r2) end def test_dispatch name, ruby_bin, chdir = add_command drb_session_manager = SessionManager.new server = DRb.start_service('druby://localhost:0', drb_session_manager) services = [] uris = [] nb_srv = 10 nb_srv.times do |i| service = drb_session_manager.service(name) uris << service.server.uri services << service end until (n = drb_session_manager.nb_sessions) == nb_srv STDERR.puts "wait for #{n} sessions started" if $VERBOSE sleep(2) end d = Dispatcher.new(uris, 5, MyDRbObject) ref = {} uris.each { |uri| ref[uri] = "hello from #{uri}" } assert(ref, d.hello) services.each do |service| service.stop_service end while (n = drb_session_manager.nb_sessions) > 0 STDERR.puts "wait for #{n} sessions stoped" if $VERBOSE sleep(2) end server.stop_service end protected def add_command ruby_bin = Config::CONFIG["RUBY_INSTALL_NAME"] ruby_bin += ' -w' if $VERBOSE name = "dispatcher_server_test" chdir = (defined? TEST_MODE) ? "cd src;" : '' cmd = chdir + "#{ruby_bin} drb/#{name}.rb" DRb::SessionManager.command[name] = cmd [ name, ruby_bin, chdir ] end end # DispatcherTest end # module DRb end