lib/rocket_job/server.rb in rocketjob-3.5.2 vs lib/rocket_job/server.rb in rocketjob-4.0.0

- old
+ new

@@ -1,5 +1,6 @@ +require 'yaml' require 'concurrent' module RocketJob # Server # # On startup a server instance will automatically register itself @@ -43,11 +44,11 @@ # When this server process was started field :started_at, type: Time # Filter to apply to control which job classes this server can process - field :filter, type: Hash + field :yaml_filter, type: String # The heartbeat information for this server embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat' # Current state @@ -186,11 +187,11 @@ # Run the server process # Attributes supplied are passed to #new def self.run(attrs = {}) Thread.current.name = 'rocketjob main' # Create Indexes on server startup - Mongoid::Tasks::Database.create_indexes + ::Mongoid::Tasks::Database.create_indexes register_signal_handlers server = create!(attrs) server.send(:run) ensure @@ -226,20 +227,30 @@ return true if heartbeat.nil? || heartbeat.updated_at.nil? dead_seconds = Config.instance.heartbeat_seconds * missed (Time.now - heartbeat.updated_at) >= dead_seconds end + # Where clause filter to apply to workers looking for jobs + def filter + YAML.load(yaml_filter) if yaml_filter + end + + def filter=(hash) + self.yaml_filter = hash.nil? ? nil : hash.to_yaml + end + private # Returns [Array<Worker>] collection of workers def workers @workers ||= [] end # Management Thread def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" + logger.info('Running with filter', filter) if filter build_heartbeat(updated_at: Time.now, workers: 0) started! logger.info 'Rocket Job Server started' run_workers @@ -260,10 +271,10 @@ ) end end logger.info 'Shutdown' - rescue Mongoid::Errors::DocumentNotFound + rescue ::Mongoid::Errors::DocumentNotFound logger.warn('Server has been destroyed. Going down hard!') rescue Exception => exc logger.error('RocketJob::Server is stopping due to an exception', exc) ensure # Logs the backtrace for each running worker