lib/rocket_job/server/model.rb in rocketjob-5.1.1 vs lib/rocket_job/server/model.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,16 +1,16 @@ -require 'yaml' -require 'active_support/concern' +require "yaml" +require "active_support/concern" module RocketJob class Server # Model attributes module Model extend ActiveSupport::Concern included do - store_in collection: 'rocket_job.servers' + store_in collection: "rocket_job.servers" # Unique Name of this server instance # Default: `host name:PID` # The unique name is used on re-start to re-queue any jobs that were being processed # at the time the server unexpectedly terminated, if any @@ -22,17 +22,17 @@ # When this server process was started field :started_at, type: Time # The heartbeat information for this server - embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat' + embeds_one :heartbeat, class_name: "RocketJob::Heartbeat" # Current state # Internal use only. Do not set this field directly field :state, type: Symbol, default: :starting - index({name: 1}, background: true, unique: true, drop_dups: true) + index({name: 1}, background: true, unique: true) validates_presence_of :state, :name, :max_workers # Requeue any jobs being worked by this server when it is destroyed before_destroy :requeue_jobs @@ -56,22 +56,23 @@ # Example no servers active: # RocketJob::Server.counts_by_state # # => {} def self.counts_by_state counts = {} - collection.aggregate([{'$group' => {_id: '$state', count: {'$sum' => 1}}}]).each do |result| - counts[result['_id'].to_sym] = result['count'] + collection.aggregate([{"$group" => {_id: "$state", count: {"$sum" => 1}}}]).each do |result| + counts[result["_id"].to_sym] = result["count"] end counts end # Destroy's all instances of zombie servers and requeues any jobs still "running" # on those servers. def self.destroy_zombies count = 0 each do |server| next unless server.zombie? + logger.warn "Destroying zombie server #{server.name}, and requeueing its jobs" server.destroy count += 1 end count @@ -81,13 +82,13 @@ def self.zombies(missed = 4) dead_seconds = Config.heartbeat_seconds * missed last_heartbeat_time = Time.now - dead_seconds where( :state.in => %i[stopping running paused], - '$or' => [ - {'heartbeat.updated_at' => {'$exists' => false}}, - {'heartbeat.updated_at' => {'$lte' => last_heartbeat_time}} + "$or" => [ + {"heartbeat.updated_at" => {"$exists" => false}}, + {"heartbeat.updated_at" => {"$lte" => last_heartbeat_time}} ] ) end end @@ -98,29 +99,29 @@ # - The server process is "hanging" # - The server is no longer able to communicate with the MongoDB Server def zombie?(missed = 4) return false unless running? || stopping? || paused? return true if heartbeat.nil? || heartbeat.updated_at.nil? + dead_seconds = Config.heartbeat_seconds * missed (Time.now - heartbeat.updated_at) >= dead_seconds end # Updates the heartbeat and returns a refreshed server instance. def refresh(worker_count) SemanticLogger.silence(:info) do find_and_update( - 'heartbeat.updated_at' => Time.now, - 'heartbeat.workers' => worker_count + "heartbeat.updated_at" => Time.now, + "heartbeat.workers" => worker_count ) end end private # Requeue any jobs assigned to this server when it is destroyed def requeue_jobs RocketJob::Job.requeue_dead_server(name) end - end end end