lib/daemon_runner/client.rb in daemon_runner-0.2.2 vs lib/daemon_runner/client.rb in daemon_runner-0.3.0
- old
+ new
@@ -1,5 +1,7 @@
+require 'rufus-scheduler'
+
module DaemonRunner
class Client
include Logger
# @attribute [r]
@@ -7,10 +9,26 @@
attr_reader :options
def initialize(options)
@options = options
+
+ # Set error handling
+ # @param [Rufus::Scheduler::Job] job job that raised the error
+ # @param [RuntimeError] error the body of the error
+ def scheduler.on_error(job, error)
+ error_sleep_time = job[:error_sleep_time]
+ logger = job[:logger]
+ task_id = job[:task_id]
+
+ logger.error "#{task_id}: #{error}"
+ logger.debug "#{task_id}: Suspending #{task_id} for #{error_sleep_time} seconds"
+ job.pause
+ sleep error_sleep_time
+ logger.debug "#{task_id}: Resuming #{task_id}"
+ job.resume
+ end
end
# Hook to allow initial setup tasks before running tasks.
# @abstract Override {#wait} to pause before starting.
# @return [void]
@@ -31,11 +49,18 @@
def tasks
raise NotImplementedError, 'Must implement this in a subclass. \
This must be an array of methods for the runner to call'
end
- # @return [Fixnum] Number of seconds to sleep between loop interations.
+ # @return [Array<Symbol, String/Fixnum>] Schedule tuple-like with the type of schedule and its timing.
+ def schedule
+ # The default type is an `interval` which trigger, execute and then trigger again after
+ # the interval has elapsed.
+ [:interval, loop_sleep_time]
+ end
+
+ # @return [Fixnum] Number of seconds to sleep between loop interactions.
def loop_sleep_time
return @loop_sleep_time unless @loop_sleep_time.nil?
@loop_sleep_time = if options[:loop_sleep_time].nil?
5
else
@@ -66,26 +91,20 @@
# Start the service
# @return [nil]
def start!
wait
- loop do # Loop on tasks
- logger.warn 'Tasks list is empty' if tasks.empty?
- tasks.each do |task|
- run_task(task)
- sleep post_task_sleep_time
- end
-
- sleep loop_sleep_time
+ logger.warn 'Tasks list is empty' if tasks.empty?
+ tasks.each do |task|
+ run_task(task)
+ sleep post_task_sleep_time
end
- rescue StandardError => e
- # Don't exit the process if initialization fails.
- logger.error(e)
-
- sleep error_sleep_time
- retry
+ scheduler.join
+ rescue SystemExit, Interrupt
+ logger.info 'Shutting down'
+ scheduler.shutdown
end
private
# @private
@@ -101,32 +120,78 @@
else
out[:instance].class.to_s
end
out[:method] = task[1]
+
+ out[:task_id] = if out[:instance].respond_to?(:task_id)
+ out[:instance].send(:task_id).to_s
+ else
+ "#{out[:class_name]}.#{out[:method]}"
+ end
+ raise ArgumentError, 'Invalid task id' if out[:task_id].nil? || out[:task_id].empty?
+
out[:args] = task[2..-1].flatten
out
end
# @private
+ # @param [Class] instance an instance of the task class
+ # @return [Hash<Symbol, String>] schedule parsed in parts: Schedule type and timing
+ def parse_schedule(instance)
+ valid_types = [:in, :at, :every, :interval, :cron]
+ out = {}
+ task_schedule = if instance.respond_to?(:schedule)
+ instance.send(:schedule)
+ else
+ schedule
+ end
+
+ raise ArgumentError, 'Malformed schedule definition, should be [TYPE, DURATION]' if task_schedule.length < 2
+ raise ArgumentError, 'Invalid schedule type' unless valid_types.include?(task_schedule[0].to_sym)
+
+ out[:type] = task_schedule[0].to_sym
+ out[:schedule] = task_schedule[1]
+ out
+ end
+
+ # @private
# @param [Array<String, String, Array>] task to run
# @return [String] output returned from task
def run_task(task)
parsed_task = parse_task(task)
instance = parsed_task[:instance]
+ schedule = parse_schedule(instance)
class_name = parsed_task[:class_name]
method = parsed_task[:method]
args = parsed_task[:args]
- log_line = "Running #{class_name}.#{method}"
- log_line += "(#{args})" unless args.empty?
- logger.debug log_line
+ task_id = parsed_task[:task_id]
- out = if args.empty?
- instance.send(method.to_sym)
- else
- instance.send(method.to_sym, args)
- end
- logger.debug "Got: #{out}"
- out
+ # Schedule the task
+ schedule_log_line = "#{task_id}: Scheduling job #{class_name}.#{method} as `:#{schedule[:type]}` type"
+ schedule_log_line += " with schedule: #{schedule[:schedule]}"
+ logger.debug schedule_log_line
+
+ scheduler.send(schedule[:type], schedule[:schedule], :overlap => false, :job => true) do |job|
+ log_line = "#{task_id}: Running #{class_name}.#{method}"
+ log_line += "(#{args})" unless args.empty?
+ logger.debug log_line
+
+ job[:error_sleep_time] = error_sleep_time
+ job[:logger] = logger
+ job[:task_id] = task_id
+
+ out = if args.empty?
+ instance.send(method.to_sym)
+ else
+ instance.send(method.to_sym, args)
+ end
+ logger.debug "#{task_id}: Got: #{out}"
+ end
+ end
+
+ # @return [Rufus::Scheduler] A scheduler instance
+ def scheduler
+ @scheduler ||= ::Rufus::Scheduler.new
end
end
end