lib/ruby_skynet/zookeeper/registry.rb in ruby_skynet-1.1.1 vs lib/ruby_skynet/zookeeper/registry.rb in ruby_skynet-1.2.0

- old
+ new

@@ -35,10 +35,14 @@ # Mandatory # # :ephemeral [Boolean] # All set operations of non-nil values will result in ephemeral nodes. # + # :on_connect [Proc] + # Block to call after the connection to Zookeeper has been established + # and every time the connection is re-established + # # :registry [Hash|ZooKeeper] # ZooKeeper configuration information, or an existing # ZooKeeper ( ZooKeeper client) instance # # :servers [Array of String] @@ -70,43 +74,37 @@ @root = @root[0..-2] if @root.end_with?("/") @root_with_trail = "#{@root}/" @root = '/' if @root == '' registry_config = params.delete(:registry) || {} - if registry_config.is_a?(::Zookeeper::Client) - @zookeeper = registry_config - else - servers = registry_config.delete(:servers) || ['127.0.0.1:2181'] - connect_timeout = (registry_config.delete(:connect_timeout) || 10).to_f - # Generate warning log entries for any unknown configuration options - registry_config.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: zookeeper.#{k}"} + # server1:2181,server2:2181,server3:2181 + @servers = (registry_config.delete(:servers) || ['127.0.0.1:2181']).join(',') + @connect_timeout = (registry_config.delete(:connect_timeout) || 10).to_f - # Create Zookeeper connection - # server1:2181,server2:2181,server3:2181 - @zookeeper = ::Zookeeper.new(servers.join(','), connect_timeout, watcher) - end + # Generate warning log entries for any unknown configuration options + registry_config.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: zookeeper.#{k}"} # Allow the serializer and deserializer implementations to be replaced @serializer = params.delete(:serializer) || RubySkynet::Zookeeper::Json::Serializer @deserializer = params.delete(:deserializer) || RubySkynet::Zookeeper::Json::Deserializer @ephemeral = params.delete(:ephemeral) @ephemeral = false if @ephemeral.nil? + @on_connect = params.delete(:on_connect) + # Generate warning log entries for any unknown configuration options params.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: #{k}"} # Hash with Array values containing the list of children for each node, if any @children = ThreadSafe::Hash.new - # Start watching registry for any changes - get_recursive(@root, watch=true, create_path=true, &block) + # Block is used in init + @block = block - at_exit do - close - end + self.init end # Retrieve the latest value from a specific path from the registry # Returns nil when the key is not present in the registry def [](key) @@ -324,73 +322,87 @@ else @zookeeper.create(:path => full_path) end end - # returns the watcher proc for this registry instances + # returns the watcher proc for this registry instance def watcher # Subscription block to call for watch events @watch_proc ||= Proc.new do |event_hash| - path = event_hash[:path] - case event_hash[:type] - when ::Zookeeper::ZOO_CHANGED_EVENT - logger.debug "Node '#{path}' Changed", event_hash + begin + path = event_hash[:path] + logger.trace "Event Received", event_hash + case event_hash[:type] + when ::Zookeeper::ZOO_CHANGED_EVENT + logger.debug "Node '#{path}' Changed", event_hash - # Fetch current value and re-subscribe - result = @zookeeper.get(:path => path, :watcher => @watch_proc) - check_rc(result) - value = @deserializer.deserialize(result[:data]) - stat = result[:stat] + # Fetch current value and re-subscribe + result = @zookeeper.get(:path => path, :watcher => @watch_proc) + check_rc(result) + value = @deserializer.deserialize(result[:data]) + stat = result[:stat] - # Invoke on_update callbacks - node_updated(relative_key(path), value, stat.version) + # Invoke on_update callbacks + node_updated(relative_key(path), value, stat.version) - when ::Zookeeper::ZOO_DELETED_EVENT - # A node has been deleted - # TODO How to ignore child deleted when it is a directory, not a leaf - logger.debug "Node '#{path}' Deleted", event_hash - @children.delete(path) - node_deleted(relative_key(path)) + when ::Zookeeper::ZOO_DELETED_EVENT + # A node has been deleted + # TODO How to ignore child deleted when it is a directory, not a leaf + logger.debug "Node '#{path}' Deleted", event_hash + @children.delete(path) + node_deleted(relative_key(path)) - when ::Zookeeper::ZOO_CHILD_EVENT - # The list of nodes has changed - Does not say if it was added or removed - logger.debug "Node '#{path}' Child changed", event_hash - result = @zookeeper.get_children(:path => path, :watcher => @watch_proc) + when ::Zookeeper::ZOO_CHILD_EVENT + # The list of nodes has changed - Does not say if it was added or removed + logger.debug "Node '#{path}' Child changed", event_hash + result = @zookeeper.get_children(:path => path, :watcher => @watch_proc) - # This node could have been deleted already - if result[:rc] == ::Zookeeper::ZOK - current_children = result[:children] - previous_children = @children[path] + # This node could have been deleted already + if result[:rc] == ::Zookeeper::ZOK + current_children = result[:children] + previous_children = @children[path] - # Save children so that we can later identify new children - @children[path] = current_children + # Save children so that we can later identify new children + @children[path] = current_children - # New Child Nodes - new_nodes = previous_children ? (current_children - previous_children) : current_children - new_nodes.each do |child| - get_recursive(File.join(path,child), true) do |key, value, version| - node_created(key, value, version) + # New Child Nodes + new_nodes = previous_children ? (current_children - previous_children) : current_children + new_nodes.each do |child| + get_recursive(File.join(path,child), true) do |key, value, version| + node_created(key, value, version) + end end + # Ignore Deleted Child Nodes since they will be handled by the Deleted Node event end - # Ignore Deleted Child Nodes since they will be handled by the Deleted Node event - end - when ::Zookeeper::ZOO_CREATED_EVENT - # Node created events are only created for paths that were deleted - # and then created again - # No op - This is covered by node_child created event - logger.debug "Node '#{path}' Created - No op", event_hash + when ::Zookeeper::ZOO_CREATED_EVENT + # Node created events are only created for paths that were deleted + # and then created again + # No op - This is covered by node_child created event + logger.debug "Node '#{path}' Created - No op", event_hash - when ::Zookeeper::ZOO_SESSION_EVENT - logger.debug "Session Event: #{@zookeeper.state_by_value(event_hash[:state])}", event_hash + when ::Zookeeper::ZOO_SESSION_EVENT + logger.debug "Session Event: #{@zookeeper.state_by_value(event_hash[:state]) if @zookeeper}", event_hash - when ::Zookeeper::ZOO_NOTWATCHING_EVENT - logger.debug "Ignoring ZOO_NOTWATCHING_EVENT", event_hash + # Replace zookeeper connection since it is stale. Only react to global request + # since this event will be received for every node being watched. + # Do not close the current connection since this background watcher thread is running + # as part of the current zookeeper connection + # event_hash => {:req_id=>-1, :type=>-1, :state=>-112, :path=>"", :context=>nil} + Thread.new { self.init } if (event_hash[:req_id] == -1) && (event_hash[:state] == ::Zookeeper::ZOO_EXPIRED_SESSION_STATE) - else - # TODO Need to re-load registry when re-connected - logger.warn "Ignoring unknown event", event_hash + when ::Zookeeper::ZOO_NOTWATCHING_EVENT + logger.debug "Ignoring ZOO_NOTWATCHING_EVENT", event_hash + + else + # TODO Need to re-load registry when re-connected + logger.warn "Ignoring unknown event", event_hash + end + rescue ::Zookeeper::Exceptions::ZookeeperException => exc + logger.warn "Watching thread failed due to Zookeeper failure", exc + rescue Exception => exc + logger.error "Watching thread failed due to unhandled exception", exc end end end # Recursively fetches all the values in the registry and optionally @@ -506,9 +518,27 @@ end # Any subscribers for all events? if all_subscribers = @delete_subscribers['*'] all_subscribers.each{|subscriber| subscriber.call(key)} + end + end + + # Create ZooKeeper connection and start watching the registry for any changes + def init + logger.benchmark_info "Connected to Zookeeper" do + @zookeeper.close if @zookeeper + # Create Zookeeper connection + @zookeeper = ::Zookeeper.new(@servers, @connect_timeout, watcher) + at_exit do + @zookeeper.close if @zookeeper + end + + # Start watching registry for any changes + get_recursive(@root, watch=true, create_path=true, &@block) + + # Call on_connect callback if supplied + @on_connect.call(self) if @on_connect end end end end \ No newline at end of file