# * George Moschovitis # (c) 2004-2005 Navel, all rights reserved. # $Id: cluster.rb 249 2005-02-04 14:03:00Z gmosx $ # WARNING: This is old code, not updated to work in the # latest nitro release. Will be fixed ASAP. $:.unshift 'lib' require 'drb' require 'monitor' require 'nitro/application' require 'nitro/server' require 'glue/cache' require 'nitro/server/session' module N # A Cluster manages the state of a collection of servers. The cluster # synchronizes the servers and distributes the state. An older version # used a polling system, ie: the servers polled the cluster to # obtain the state. This version uses a push system, ie when the # state is changed a delta is pushed to the clients. # # TODO: use Sync instead of Monitor class Cluster < N::Application # The CHash 'endpoint' resides in the Cluster server class CHash < Hash attr :mon # drbobject for this hash (local) attr_accessor :ldrb # the cluster, use a cluster to implement a set # (one server per drb_uri) attr_accessor :cluster # def initialize(ldrb_uri = "druby://:8000") @mon = Monitor.new @ldrb = DRb.start_service(ldrb_uri, self) @cluster = {} end # def join(sdrb_uri) @mon.synchronize { cluster[sdrb_uri] = DRbObject.new(nil, sdrb_uri) } end alias_method :old_set, :[]= # Not really usefull def []=(key, value) # store the value (useful on server restarts) @mon.synchronize { old_set(key, value) puts "CLUSTER #{key} = #{value}" cluster.each { |uri, sdrb| begin sdrb.server_sync(key, value) rescue => ex Logger.error "Server at #{uri} is down, removing from cluster" cluster.delete(uri) end } } end # Use this, avoids syncing the original server, and avoids a # nasty deadlock. # def cluster_sync(key, value, server_uri) # store the value (useful on server restarts) @mon.synchronize { old_set(key, value) puts "CLUSTER #{key} = #{value}" cluster.each { |uri, sdrb| begin sdrb.server_sync(key, value) unless uri == server_uri rescue => ex Logger.error "Server at #{uri} is down, removing from cluster" cluster.delete(uri) end } } end def [](key) @mon.synchronize { puts "LOOKUP #{key}" return super } end end # The SHash 'endpoint' resides in the App server class SHash < Hash attr :mon # drbobject for this hash (local) attr_accessor :ldrb # drb for the cluster hash attr_accessor :cdrb # ldrb = local drb uri # cdrb = cluster drb uri def initialize(ldrb_uri = "druby://:9000", cdrb_uri = "druby://:8000") @mon = Monitor.new @ldrb_uri = ldrb_uri @ldrb = DRb.start_service(ldrb_uri, self) @cdrb = DRbObject.new(nil, cdrb_uri) @cdrb.join(ldrb_uri) end alias_method :old_set, :[]= # # def []=(key, value) # store the value in the local hash @mon.synchronize { puts "LOCAL #{key} = #{value}" old_set(key, value) @cdrb.cluster_sync(key, value, @ldrb_uri) } end # If the key is not found in the local hash, try the # cluster hash. # def [](key) @mon.synchronize { unless value = super value = @cdrb[key] old_set(key, value) end return value } end # Called by the cluster # def server_sync(key, value) puts "SYNC #{key} = #{value}" @mon.synchronize { old_set(key, value) } end end # Cluster Last Modified Hash class Clm < CHash def [](key) @mon.synchronize { unless value = super puts "INIT #{key}" value = Time.now.to_i old_set(key, value) end return value } end end # = Slm Server Last Modified Hash class Slm < SHash def set!(key, lm = nil) lm = Time.now.to_i unless lm self[key] = lm return lm end end def initialize(name = "Cluster") super end def run N::Cluster::Clm.new DRb.start_service("druby://:8001", N::SessionManager.new) while true sleep(5000) end super end end end if $0 == __FILE__ require 'logger' N::Cluster.new.exec() end