lib/resque/cluster/member.rb in resque-cluster-0.1.1.1 vs lib/resque/cluster/member.rb in resque-cluster-0.2.0
- old
+ new
@@ -3,18 +3,22 @@
module Resque
class Cluster
# Member is a single member of a resque pool cluster
class Member
- attr_reader :hostname, :pool, :local_config, :global_config
+ attr_reader :hostname, :pool, :config
def initialize(started_pool)
@pool = started_pool
- @local_config = parse_config(Cluster.config[:local_config_path])
- @global_config = parse_config(Cluster.config[:global_config_path])
- @global_config = @local_config if global_config.empty?
- @worker_count_manager = initialize_gru
+ @config = Config.new(Cluster.config[:local_config_path], Cluster.config[:global_config_path])
+ if @config.verified?
+ @config.log_warnings
+ @worker_count_manager = initialize_gru
+ else
+ @config.log_errors
+ @pool.quit
+ end
end
def perform
check_for_worker_count_adjustment
end
@@ -23,18 +27,19 @@
remove_counts
unqueue_all_workers
end
def check_for_worker_count_adjustment
+ return unless gru_is_inititalized?
host_count_adjustment = @worker_count_manager.adjust_workers
adjust_worker_counts(host_count_adjustment) if host_count_adjustment
end
private
def global_prefix
- "cluster:#{Cluster.config[:cluster_name]}:#{Cluster.config[:environment]}"
+ "cluster:#{Cluster.config[:cluster_name]}:#{Cluster.config[:environment]}:#{@config.version_git_hash}"
end
def member_prefix
"#{global_prefix}:#{hostname}"
end
@@ -42,63 +47,48 @@
def running_workers_key_name
"#{member_prefix}:running_workers"
end
def initialize_gru
- Gru.create(cluster_member_settings)
+ Gru.create(@config.gru_format)
end
def hostname
@hostname ||= Socket.gethostname
end
def adjust_worker_counts(count_adjustments)
count_adjustments.each do |worker, count|
- next if count == 0
@pool.adjust_worker_counts(worker, count)
update_counts
end
end
- def parse_config(config_path)
- return {} unless config_path && File.exist?(config_path)
- YAML.load(ERB.new(IO.read(config_path)).result)
- end
-
def remove_counts
Resque.redis.del(running_workers_key_name)
end
def unqueue_all_workers
- @worker_count_manager.release_workers
+ @worker_count_manager.release_workers if gru_is_inititalized?
end
def unqueue_workers(workers)
workers = Array(workers)
workers.each do |worker|
- @worker_count_manager.release_workers(worker)
+ @worker_count_manager.release_workers(worker) if gru_is_inititalized?
end
end
def update_counts
current_workers = @pool.config
current_workers.each do |key, value|
Resque.redis.hset(running_workers_key_name, key, value)
end
end
- def cluster_member_settings
- {
- cluster_maximums: @global_config["global_maximums"] || @global_config,
- host_maximums: @local_config,
- client_settings: Resque.redis.client.options,
- rebalance_flag: @global_config["rebalance_cluster"] || false,
- max_workers_per_host: @global_config["max_workers_per_host"] || nil,
- cluster_name: Cluster.config[:cluster_name],
- environment_name: Cluster.config[:environment],
- presume_host_dead_after: @global_config["presume_dead_after"] || 120,
- manage_worker_heartbeats: true
- }
+ def gru_is_inititalized?
+ ! @worker_count_manager.nil?
end
+
end
end
end