lib/ruby_skynet/zookeeper/registry.rb in ruby_skynet-1.1.1 vs lib/ruby_skynet/zookeeper/registry.rb in ruby_skynet-1.2.0
- old
+ new
@@ -35,10 +35,14 @@
# Mandatory
#
# :ephemeral [Boolean]
# All set operations of non-nil values will result in ephemeral nodes.
#
+ # :on_connect [Proc]
+ # Block to call after the connection to Zookeeper has been established
+ # and every time the connection is re-established
+ #
# :registry [Hash|ZooKeeper]
# ZooKeeper configuration information, or an existing
# ZooKeeper ( ZooKeeper client) instance
#
# :servers [Array of String]
@@ -70,43 +74,37 @@
@root = @root[0..-2] if @root.end_with?("/")
@root_with_trail = "#{@root}/"
@root = '/' if @root == ''
registry_config = params.delete(:registry) || {}
- if registry_config.is_a?(::Zookeeper::Client)
- @zookeeper = registry_config
- else
- servers = registry_config.delete(:servers) || ['127.0.0.1:2181']
- connect_timeout = (registry_config.delete(:connect_timeout) || 10).to_f
- # Generate warning log entries for any unknown configuration options
- registry_config.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: zookeeper.#{k}"}
+ # server1:2181,server2:2181,server3:2181
+ @servers = (registry_config.delete(:servers) || ['127.0.0.1:2181']).join(',')
+ @connect_timeout = (registry_config.delete(:connect_timeout) || 10).to_f
- # Create Zookeeper connection
- # server1:2181,server2:2181,server3:2181
- @zookeeper = ::Zookeeper.new(servers.join(','), connect_timeout, watcher)
- end
+ # Generate warning log entries for any unknown configuration options
+ registry_config.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: zookeeper.#{k}"}
# Allow the serializer and deserializer implementations to be replaced
@serializer = params.delete(:serializer) || RubySkynet::Zookeeper::Json::Serializer
@deserializer = params.delete(:deserializer) || RubySkynet::Zookeeper::Json::Deserializer
@ephemeral = params.delete(:ephemeral)
@ephemeral = false if @ephemeral.nil?
+ @on_connect = params.delete(:on_connect)
+
# Generate warning log entries for any unknown configuration options
params.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: #{k}"}
# Hash with Array values containing the list of children for each node, if any
@children = ThreadSafe::Hash.new
- # Start watching registry for any changes
- get_recursive(@root, watch=true, create_path=true, &block)
+ # Block is used in init
+ @block = block
- at_exit do
- close
- end
+ self.init
end
# Retrieve the latest value from a specific path from the registry
# Returns nil when the key is not present in the registry
def [](key)
@@ -324,73 +322,87 @@
else
@zookeeper.create(:path => full_path)
end
end
- # returns the watcher proc for this registry instances
+ # returns the watcher proc for this registry instance
def watcher
# Subscription block to call for watch events
@watch_proc ||= Proc.new do |event_hash|
- path = event_hash[:path]
- case event_hash[:type]
- when ::Zookeeper::ZOO_CHANGED_EVENT
- logger.debug "Node '#{path}' Changed", event_hash
+ begin
+ path = event_hash[:path]
+ logger.trace "Event Received", event_hash
+ case event_hash[:type]
+ when ::Zookeeper::ZOO_CHANGED_EVENT
+ logger.debug "Node '#{path}' Changed", event_hash
- # Fetch current value and re-subscribe
- result = @zookeeper.get(:path => path, :watcher => @watch_proc)
- check_rc(result)
- value = @deserializer.deserialize(result[:data])
- stat = result[:stat]
+ # Fetch current value and re-subscribe
+ result = @zookeeper.get(:path => path, :watcher => @watch_proc)
+ check_rc(result)
+ value = @deserializer.deserialize(result[:data])
+ stat = result[:stat]
- # Invoke on_update callbacks
- node_updated(relative_key(path), value, stat.version)
+ # Invoke on_update callbacks
+ node_updated(relative_key(path), value, stat.version)
- when ::Zookeeper::ZOO_DELETED_EVENT
- # A node has been deleted
- # TODO How to ignore child deleted when it is a directory, not a leaf
- logger.debug "Node '#{path}' Deleted", event_hash
- @children.delete(path)
- node_deleted(relative_key(path))
+ when ::Zookeeper::ZOO_DELETED_EVENT
+ # A node has been deleted
+ # TODO How to ignore child deleted when it is a directory, not a leaf
+ logger.debug "Node '#{path}' Deleted", event_hash
+ @children.delete(path)
+ node_deleted(relative_key(path))
- when ::Zookeeper::ZOO_CHILD_EVENT
- # The list of nodes has changed - Does not say if it was added or removed
- logger.debug "Node '#{path}' Child changed", event_hash
- result = @zookeeper.get_children(:path => path, :watcher => @watch_proc)
+ when ::Zookeeper::ZOO_CHILD_EVENT
+ # The list of nodes has changed - Does not say if it was added or removed
+ logger.debug "Node '#{path}' Child changed", event_hash
+ result = @zookeeper.get_children(:path => path, :watcher => @watch_proc)
- # This node could have been deleted already
- if result[:rc] == ::Zookeeper::ZOK
- current_children = result[:children]
- previous_children = @children[path]
+ # This node could have been deleted already
+ if result[:rc] == ::Zookeeper::ZOK
+ current_children = result[:children]
+ previous_children = @children[path]
- # Save children so that we can later identify new children
- @children[path] = current_children
+ # Save children so that we can later identify new children
+ @children[path] = current_children
- # New Child Nodes
- new_nodes = previous_children ? (current_children - previous_children) : current_children
- new_nodes.each do |child|
- get_recursive(File.join(path,child), true) do |key, value, version|
- node_created(key, value, version)
+ # New Child Nodes
+ new_nodes = previous_children ? (current_children - previous_children) : current_children
+ new_nodes.each do |child|
+ get_recursive(File.join(path,child), true) do |key, value, version|
+ node_created(key, value, version)
+ end
end
+ # Ignore Deleted Child Nodes since they will be handled by the Deleted Node event
end
- # Ignore Deleted Child Nodes since they will be handled by the Deleted Node event
- end
- when ::Zookeeper::ZOO_CREATED_EVENT
- # Node created events are only created for paths that were deleted
- # and then created again
- # No op - This is covered by node_child created event
- logger.debug "Node '#{path}' Created - No op", event_hash
+ when ::Zookeeper::ZOO_CREATED_EVENT
+ # Node created events are only created for paths that were deleted
+ # and then created again
+ # No op - This is covered by node_child created event
+ logger.debug "Node '#{path}' Created - No op", event_hash
- when ::Zookeeper::ZOO_SESSION_EVENT
- logger.debug "Session Event: #{@zookeeper.state_by_value(event_hash[:state])}", event_hash
+ when ::Zookeeper::ZOO_SESSION_EVENT
+ logger.debug "Session Event: #{@zookeeper.state_by_value(event_hash[:state]) if @zookeeper}", event_hash
- when ::Zookeeper::ZOO_NOTWATCHING_EVENT
- logger.debug "Ignoring ZOO_NOTWATCHING_EVENT", event_hash
+ # Replace zookeeper connection since it is stale. Only react to global request
+ # since this event will be received for every node being watched.
+ # Do not close the current connection since this background watcher thread is running
+ # as part of the current zookeeper connection
+ # event_hash => {:req_id=>-1, :type=>-1, :state=>-112, :path=>"", :context=>nil}
+ Thread.new { self.init } if (event_hash[:req_id] == -1) && (event_hash[:state] == ::Zookeeper::ZOO_EXPIRED_SESSION_STATE)
- else
- # TODO Need to re-load registry when re-connected
- logger.warn "Ignoring unknown event", event_hash
+ when ::Zookeeper::ZOO_NOTWATCHING_EVENT
+ logger.debug "Ignoring ZOO_NOTWATCHING_EVENT", event_hash
+
+ else
+ # TODO Need to re-load registry when re-connected
+ logger.warn "Ignoring unknown event", event_hash
+ end
+ rescue ::Zookeeper::Exceptions::ZookeeperException => exc
+ logger.warn "Watching thread failed due to Zookeeper failure", exc
+ rescue Exception => exc
+ logger.error "Watching thread failed due to unhandled exception", exc
end
end
end
# Recursively fetches all the values in the registry and optionally
@@ -506,9 +518,27 @@
end
# Any subscribers for all events?
if all_subscribers = @delete_subscribers['*']
all_subscribers.each{|subscriber| subscriber.call(key)}
+ end
+ end
+
+ # Create ZooKeeper connection and start watching the registry for any changes
+ def init
+ logger.benchmark_info "Connected to Zookeeper" do
+ @zookeeper.close if @zookeeper
+ # Create Zookeeper connection
+ @zookeeper = ::Zookeeper.new(@servers, @connect_timeout, watcher)
+ at_exit do
+ @zookeeper.close if @zookeeper
+ end
+
+ # Start watching registry for any changes
+ get_recursive(@root, watch=true, create_path=true, &@block)
+
+ # Call on_connect callback if supplied
+ @on_connect.call(self) if @on_connect
end
end
end
end
\ No newline at end of file