lib/rocket_job/server.rb in rocketjob-3.5.2 vs lib/rocket_job/server.rb in rocketjob-4.0.0
- old
+ new
@@ -1,5 +1,6 @@
+require 'yaml'
require 'concurrent'
module RocketJob
# Server
#
# On startup a server instance will automatically register itself
@@ -43,11 +44,11 @@
# When this server process was started
field :started_at, type: Time
# Filter to apply to control which job classes this server can process
- field :filter, type: Hash
+ field :yaml_filter, type: String
# The heartbeat information for this server
embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat'
# Current state
@@ -186,11 +187,11 @@
# Run the server process
# Attributes supplied are passed to #new
def self.run(attrs = {})
Thread.current.name = 'rocketjob main'
# Create Indexes on server startup
- Mongoid::Tasks::Database.create_indexes
+ ::Mongoid::Tasks::Database.create_indexes
register_signal_handlers
server = create!(attrs)
server.send(:run)
ensure
@@ -226,20 +227,30 @@
return true if heartbeat.nil? || heartbeat.updated_at.nil?
dead_seconds = Config.instance.heartbeat_seconds * missed
(Time.now - heartbeat.updated_at) >= dead_seconds
end
+ # Where clause filter to apply to workers looking for jobs
+ def filter
+ YAML.load(yaml_filter) if yaml_filter
+ end
+
+ def filter=(hash)
+ self.yaml_filter = hash.nil? ? nil : hash.to_yaml
+ end
+
private
# Returns [Array<Worker>] collection of workers
def workers
@workers ||= []
end
# Management Thread
def run
logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
+ logger.info('Running with filter', filter) if filter
build_heartbeat(updated_at: Time.now, workers: 0)
started!
logger.info 'Rocket Job Server started'
run_workers
@@ -260,10 +271,10 @@
)
end
end
logger.info 'Shutdown'
- rescue Mongoid::Errors::DocumentNotFound
+ rescue ::Mongoid::Errors::DocumentNotFound
logger.warn('Server has been destroyed. Going down hard!')
rescue Exception => exc
logger.error('RocketJob::Server is stopping due to an exception', exc)
ensure
# Logs the backtrace for each running worker