# Copyright: Copyright (c) 2004 Nicolas Despres. All rights reserved. # Author: Nicolas Despres . # License: Gnu General Public License. # $LastChangedBy: ertai $ # $Id: dispatcher.rb 186 2005-04-03 00:07:45Z ertai $ require 'drb' require 'session/client' module Session class Dispatcher include DRb::DRbUndumped # # Constants # MAX_JOBS = 5 MAX_JOBS_LIMIT = 100 SRVDB = { 'druby://localhost:42000' => { :usrname => 'guest', :passwd => 'guest'.crypt(Const::SALT_KEY) } } BASE_CLIENT_CLASS = Client # # Attributes # attr_reader :max_jobs # # Constructor # def initialize(srvdb=SRVDB, max_jobs=MAX_JOBS, client_class=BASE_CLIENT_CLASS, *args) unless 1 <= max_jobs && max_jobs <= MAX_JOBS_LIMIT raise(ArgumentError, "`#{max_jobs}' - too big max jobs (must be <= #{MAX_JOBS_LIMIT})") end @max_jobs = max_jobs @srvdb = {} srvdb.each do |uri, srv| add(uri, srv[:usrname], srv[:passwd], client_class, *args) end end def dispatch(clts, request, *args, &block) mutex = Mutex.new threads = [] nb_threads = [ clts.size, @max_jobs].min clts_q = clts.dup result = {} nb_threads.times do threads << Thread.new do loop do clt = mutex.synchronize { clts_q.shift } break if clt.nil? res = request(clt, request, *args, &block) mutex.synchronize { result.update(res) } end end end threads.each { |thread| thread.join } result end alias update dispatch def select(&comp) clts = [] each_client { |uri, clt| clts << clt if comp[clt] } clts end def select_all select { |clt| true } end def select_by(attr, value) select { |clt| test_client_attribut(clt, attr, value) } end def multi_select_by(hash, init_val=true, &bool_fun) select do |clt| ret = init_val hash.each do |k, v| ret = bool_fun[ret, test_client_attribut(clt, k, v)] end ret end end def and_multi_select_by(hash) select do |clt| ret = true hash.each do |k, v| ret = (ret and test_client_attribut(clt, k, v)) end ret end end def or_multi_select_by(hash) select do |clt| ret = false hash.each { |k, v| ret = (ret or test_client_attribut(clt, k, v)) } ret end end def dispatch_all(request, *args, &block) dispatch(select_all, request, *args, &block) end def method_missing(request, *args, &block) dispatch_all(request, *args, &block) end # overload me def choose(clients) raise(NotImplementedError, "`choose' - not implemented at this level") end def choose_and_select(&block) choose(select(&block)) end def each @srvdb.each { |uri, desc| yield(uri, desc) } end def each_client @srvdb.each { |uri, desc| yield(uri, desc[:client]) } end def [](uri) @srvdb[uri] end def add(uri, usrname, passwd, client_class, *args) return nil if @srvdb[uri] @srvdb[uri] = { :usrname => usrname, :passwd => passwd, :client => client_class.new(uri, usrname, passwd, *args) } end def del(uri) @srvdb.delete(uri) end def request(clt, request, *args, &block) res = nil begin res = clt.send(request, *args, &block) rescue Exception => err res = err end { clt.uri => res } end protected def test_client_attribut(clt, attr, value) if value.is_a? Regexp clt.send(attr).to_s =~ value else clt.send(attr) == value end end end # module Dispatcher end # module Session