lib/resque/cluster/member.rb in resque-cluster-0.1.1.1 vs lib/resque/cluster/member.rb in resque-cluster-0.2.0

- old
+ new

@@ -3,18 +3,22 @@ module Resque class Cluster # Member is a single member of a resque pool cluster class Member - attr_reader :hostname, :pool, :local_config, :global_config + attr_reader :hostname, :pool, :config def initialize(started_pool) @pool = started_pool - @local_config = parse_config(Cluster.config[:local_config_path]) - @global_config = parse_config(Cluster.config[:global_config_path]) - @global_config = @local_config if global_config.empty? - @worker_count_manager = initialize_gru + @config = Config.new(Cluster.config[:local_config_path], Cluster.config[:global_config_path]) + if @config.verified? + @config.log_warnings + @worker_count_manager = initialize_gru + else + @config.log_errors + @pool.quit + end end def perform check_for_worker_count_adjustment end @@ -23,18 +27,19 @@ remove_counts unqueue_all_workers end def check_for_worker_count_adjustment + return unless gru_is_inititalized? host_count_adjustment = @worker_count_manager.adjust_workers adjust_worker_counts(host_count_adjustment) if host_count_adjustment end private def global_prefix - "cluster:#{Cluster.config[:cluster_name]}:#{Cluster.config[:environment]}" + "cluster:#{Cluster.config[:cluster_name]}:#{Cluster.config[:environment]}:#{@config.version_git_hash}" end def member_prefix "#{global_prefix}:#{hostname}" end @@ -42,63 +47,48 @@ def running_workers_key_name "#{member_prefix}:running_workers" end def initialize_gru - Gru.create(cluster_member_settings) + Gru.create(@config.gru_format) end def hostname @hostname ||= Socket.gethostname end def adjust_worker_counts(count_adjustments) count_adjustments.each do |worker, count| - next if count == 0 @pool.adjust_worker_counts(worker, count) update_counts end end - def parse_config(config_path) - return {} unless config_path && File.exist?(config_path) - YAML.load(ERB.new(IO.read(config_path)).result) - end - def remove_counts Resque.redis.del(running_workers_key_name) end def unqueue_all_workers - @worker_count_manager.release_workers + @worker_count_manager.release_workers if gru_is_inititalized? end def unqueue_workers(workers) workers = Array(workers) workers.each do |worker| - @worker_count_manager.release_workers(worker) + @worker_count_manager.release_workers(worker) if gru_is_inititalized? end end def update_counts current_workers = @pool.config current_workers.each do |key, value| Resque.redis.hset(running_workers_key_name, key, value) end end - def cluster_member_settings - { - cluster_maximums: @global_config["global_maximums"] || @global_config, - host_maximums: @local_config, - client_settings: Resque.redis.client.options, - rebalance_flag: @global_config["rebalance_cluster"] || false, - max_workers_per_host: @global_config["max_workers_per_host"] || nil, - cluster_name: Cluster.config[:cluster_name], - environment_name: Cluster.config[:environment], - presume_host_dead_after: @global_config["presume_dead_after"] || 120, - manage_worker_heartbeats: true - } + def gru_is_inititalized? + ! @worker_count_manager.nil? end + end end end