# code: # * George Moschovitis # # (c) 2004 Navel, all rights reserved. # $Id: cluster.rb 155 2004-11-13 20:32:12Z gmosx $ $:.unshift "lib" require "drb" require "monitor" require "nitro/application" require "nitro/server" require "nitro/utils/cache" require "nitro/server/session" module N # = Cluster # # A Cluster manages the state of a collection of servers. The cluster # synchronizes the servers and distributes the state. An older version # used a polling system, ie: the servers polled the cluster to # obtain the state. This version uses a push system, ie when the # state is changed a delta is pushed to the clients. # # TODO: use Sync instead of Monitor # class Cluster < N::Application # = CHash ClusterHash # # The CHash 'endpoint' resides in the Cluster server # class CHash < Hash attr :mon # drbobject for this hash (local) attr_accessor :ldrb # the cluster, use a cluster to implement a set # (one server per drb_uri) attr_accessor :cluster # # def initialize(ldrb_uri = "druby://:8000") @mon = Monitor.new @ldrb = DRb.start_service(ldrb_uri, self) @cluster = {} end # # def join(sdrb_uri) @mon.synchronize { cluster[sdrb_uri] = DRbObject.new(nil, sdrb_uri) } end alias_method :old_set, :[]= # Not really usefull # def []=(key, value) # store the value (useful on server restarts) @mon.synchronize { old_set(key, value) puts "CLUSTER #{key} = #{value}" cluster.each { |uri, sdrb| begin sdrb.server_sync(key, value) rescue => ex $log.error "Server at #{uri} is down, removing from cluster" cluster.delete(uri) end } } end # Use this, avoids syncing the original server, and avoids a # nasty deadlock. # def cluster_sync(key, value, server_uri) # store the value (useful on server restarts) @mon.synchronize { old_set(key, value) puts "CLUSTER #{key} = #{value}" cluster.each { |uri, sdrb| begin sdrb.server_sync(key, value) unless uri == server_uri rescue => ex $log.error "Server at #{uri} is down, removing from cluster" cluster.delete(uri) end } } end def [](key) @mon.synchronize { puts "LOOKUP #{key}" return super } end end # = SHash ServerHash # # The SHash 'endpoint' resides in the App server # class SHash < Hash attr :mon # drbobject for this hash (local) attr_accessor :ldrb # drb for the cluster hash attr_accessor :cdrb # ldrb = local drb uri # cdrb = cluster drb uri # def initialize(ldrb_uri = "druby://:9000", cdrb_uri = "druby://:8000") @mon = Monitor.new @ldrb_uri = ldrb_uri @ldrb = DRb.start_service(ldrb_uri, self) @cdrb = DRbObject.new(nil, cdrb_uri) @cdrb.join(ldrb_uri) end alias_method :old_set, :[]= # # def []=(key, value) # store the value in the local hash @mon.synchronize { puts "LOCAL #{key} = #{value}" old_set(key, value) @cdrb.cluster_sync(key, value, @ldrb_uri) } end # If the key is not found in the local hash, try the # cluster hash. # def [](key) @mon.synchronize { unless value = super value = @cdrb[key] old_set(key, value) end return value } end # Called by the cluster # def server_sync(key, value) puts "SYNC #{key} = #{value}" @mon.synchronize { old_set(key, value) } end end # = Clm Cluster Last Modified Hash # class Clm < CHash def [](key) @mon.synchronize { unless value = super puts "INIT #{key}" value = Time.now.to_i old_set(key, value) end return value } end end # = Slm Server Last Modified Hash # class Slm < SHash def set!(key, lm = nil) lm = Time.now.to_i unless lm self[key] = lm return lm end end def initialize(name = "Cluster") super end def run N::Cluster::Clm.new DRb.start_service("druby://:8001", N::SessionManager.new) while true sleep(5000) end super end end end # module if $0 == __FILE__ require "logger"; $log = Logger.new(STDERR) N::Cluster.new.exec() end