lib/rocket_job/server/model.rb in rocketjob-5.1.1 vs lib/rocket_job/server/model.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,16 +1,16 @@
-require 'yaml'
-require 'active_support/concern'
+require "yaml"
+require "active_support/concern"
module RocketJob
class Server
# Model attributes
module Model
extend ActiveSupport::Concern
included do
- store_in collection: 'rocket_job.servers'
+ store_in collection: "rocket_job.servers"
# Unique Name of this server instance
# Default: `host name:PID`
# The unique name is used on re-start to re-queue any jobs that were being processed
# at the time the server unexpectedly terminated, if any
@@ -22,17 +22,17 @@
# When this server process was started
field :started_at, type: Time
# The heartbeat information for this server
- embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat'
+ embeds_one :heartbeat, class_name: "RocketJob::Heartbeat"
# Current state
# Internal use only. Do not set this field directly
field :state, type: Symbol, default: :starting
- index({name: 1}, background: true, unique: true, drop_dups: true)
+ index({name: 1}, background: true, unique: true)
validates_presence_of :state, :name, :max_workers
# Requeue any jobs being worked by this server when it is destroyed
before_destroy :requeue_jobs
@@ -56,22 +56,23 @@
# Example no servers active:
# RocketJob::Server.counts_by_state
# # => {}
def self.counts_by_state
counts = {}
- collection.aggregate([{'$group' => {_id: '$state', count: {'$sum' => 1}}}]).each do |result|
- counts[result['_id'].to_sym] = result['count']
+ collection.aggregate([{"$group" => {_id: "$state", count: {"$sum" => 1}}}]).each do |result|
+ counts[result["_id"].to_sym] = result["count"]
end
counts
end
# Destroy's all instances of zombie servers and requeues any jobs still "running"
# on those servers.
def self.destroy_zombies
count = 0
each do |server|
next unless server.zombie?
+
logger.warn "Destroying zombie server #{server.name}, and requeueing its jobs"
server.destroy
count += 1
end
count
@@ -81,13 +82,13 @@
def self.zombies(missed = 4)
dead_seconds = Config.heartbeat_seconds * missed
last_heartbeat_time = Time.now - dead_seconds
where(
:state.in => %i[stopping running paused],
- '$or' => [
- {'heartbeat.updated_at' => {'$exists' => false}},
- {'heartbeat.updated_at' => {'$lte' => last_heartbeat_time}}
+ "$or" => [
+ {"heartbeat.updated_at" => {"$exists" => false}},
+ {"heartbeat.updated_at" => {"$lte" => last_heartbeat_time}}
]
)
end
end
@@ -98,29 +99,29 @@
# - The server process is "hanging"
# - The server is no longer able to communicate with the MongoDB Server
def zombie?(missed = 4)
return false unless running? || stopping? || paused?
return true if heartbeat.nil? || heartbeat.updated_at.nil?
+
dead_seconds = Config.heartbeat_seconds * missed
(Time.now - heartbeat.updated_at) >= dead_seconds
end
# Updates the heartbeat and returns a refreshed server instance.
def refresh(worker_count)
SemanticLogger.silence(:info) do
find_and_update(
- 'heartbeat.updated_at' => Time.now,
- 'heartbeat.workers' => worker_count
+ "heartbeat.updated_at" => Time.now,
+ "heartbeat.workers" => worker_count
)
end
end
private
# Requeue any jobs assigned to this server when it is destroyed
def requeue_jobs
RocketJob::Job.requeue_dead_server(name)
end
-
end
end
end