# = Cluster # # TODO: use Sync instead of Monitor # # code:: gmosx # # (c) 2004 Navel, all rights reserved. # $Id: cluster.rb 74 2004-10-18 11:30:20Z gmosx $ $:.unshift "nitro/lib" require "drb" require "monitor" require "n/application" require "n/server" require "n/utils/cache" require "n/app/session" module N # = Cluster # # A Cluster is a collection of servers. The cluster synchronizes the # servers and distributes the state. An older version used a polling # system, ie: the servers polled the cluster to obtain the state. This # version uses a push system, ie when the state is changed a delta # is pushed to the clients. # class Cluster < N::Application # = CHash ClusterHash # class CHash < Hash attr :mon # drbobject for this hash (local) attr_accessor :ldrb # the cluster, use a cluster to implement a set # (one server per drb_uri) attr_accessor :cluster # # def initialize(ldrb_uri = "druby://:8000") @mon = Monitor.new @ldrb = DRb.start_service(ldrb_uri, self) @cluster = {} end # # def join(sdrb_uri) @mon.synchronize { cluster[sdrb_uri] = DRbObject.new(nil, sdrb_uri) } end alias_method :old_set, :[]= # Not really usefull # def []=(key, value) # store the value (useful on server restarts) @mon.synchronize { old_set(key, value) puts "CLUSTER #{key} = #{value}" cluster.each { |uri, sdrb| begin sdrb.server_sync(key, value) rescue => ex $log.error "Server at #{uri} is down, removing from cluster" cluster.delete(uri) end } } end # Use this, avoids syncing the original server, and avoids a # nasty deadlock. # def cluster_sync(key, value, server_uri) # store the value (useful on server restarts) @mon.synchronize { old_set(key, value) puts "CLUSTER #{key} = #{value}" cluster.each { |uri, sdrb| begin sdrb.server_sync(key, value) unless uri == server_uri rescue => ex $log.error "Server at #{uri} is down, removing from cluster" cluster.delete(uri) end } } end def [](key) @mon.synchronize { puts "LOOKUP #{key}" return super } end end # = SHash ServerHash # class SHash < Hash attr :mon # drbobject for this hash (local) attr_accessor :ldrb # drb for the cluster hash attr_accessor :cdrb # ldrb = local drb uri # cdrb = cluster drb uri # def initialize(ldrb_uri = "druby://:9000", cdrb_uri = "druby://:8000") @mon = Monitor.new @ldrb_uri = ldrb_uri @ldrb = DRb.start_service(ldrb_uri, self) @cdrb = DRbObject.new(nil, cdrb_uri) @cdrb.join(ldrb_uri) end alias_method :old_set, :[]= # # def []=(key, value) # store the value in the local hash @mon.synchronize { puts "LOCAL #{key} = #{value}" old_set(key, value) @cdrb.cluster_sync(key, value, @ldrb_uri) } end # If the key is not found in the local hash, try the # cluster hash. # def [](key) @mon.synchronize { unless value = super value = @cdrb[key] old_set(key, value) end return value } end # Called by the cluster # def server_sync(key, value) puts "SYNC #{key} = #{value}" @mon.synchronize { old_set(key, value) } end end # = Clm Cluster Last Modified Hash # class Clm < CHash def [](key) @mon.synchronize { unless value = super puts "INIT #{key}" value = Time.now.to_i old_set(key, value) end return value } end end # = Slm Server Last Modified Hash # class Slm < SHash def set!(key, lm = nil) lm = Time.now.to_i unless lm self[key] = lm return lm end end def initialize(name = "Cluster") super end def run N::Cluster::Clm.new DRb.start_service("druby://:8001", N::App::SessionManager.new) while true sleep(5000) end super end end end # module if $0 == __FILE__ require "logger"; $log = Logger.new(STDERR) N::Cluster.new.exec() end