lib/daemon_runner/client.rb in daemon_runner-0.2.2 vs lib/daemon_runner/client.rb in daemon_runner-0.3.0

- old
+ new

@@ -1,5 +1,7 @@ +require 'rufus-scheduler' + module DaemonRunner class Client include Logger # @attribute [r] @@ -7,10 +9,26 @@ attr_reader :options def initialize(options) @options = options + + # Set error handling + # @param [Rufus::Scheduler::Job] job job that raised the error + # @param [RuntimeError] error the body of the error + def scheduler.on_error(job, error) + error_sleep_time = job[:error_sleep_time] + logger = job[:logger] + task_id = job[:task_id] + + logger.error "#{task_id}: #{error}" + logger.debug "#{task_id}: Suspending #{task_id} for #{error_sleep_time} seconds" + job.pause + sleep error_sleep_time + logger.debug "#{task_id}: Resuming #{task_id}" + job.resume + end end # Hook to allow initial setup tasks before running tasks. # @abstract Override {#wait} to pause before starting. # @return [void] @@ -31,11 +49,18 @@ def tasks raise NotImplementedError, 'Must implement this in a subclass. \ This must be an array of methods for the runner to call' end - # @return [Fixnum] Number of seconds to sleep between loop interations. + # @return [Array<Symbol, String/Fixnum>] Schedule tuple-like with the type of schedule and its timing. + def schedule + # The default type is an `interval` which trigger, execute and then trigger again after + # the interval has elapsed. + [:interval, loop_sleep_time] + end + + # @return [Fixnum] Number of seconds to sleep between loop interactions. def loop_sleep_time return @loop_sleep_time unless @loop_sleep_time.nil? @loop_sleep_time = if options[:loop_sleep_time].nil? 5 else @@ -66,26 +91,20 @@ # Start the service # @return [nil] def start! wait - loop do # Loop on tasks - logger.warn 'Tasks list is empty' if tasks.empty? - tasks.each do |task| - run_task(task) - sleep post_task_sleep_time - end - - sleep loop_sleep_time + logger.warn 'Tasks list is empty' if tasks.empty? + tasks.each do |task| + run_task(task) + sleep post_task_sleep_time end - rescue StandardError => e - # Don't exit the process if initialization fails. - logger.error(e) - - sleep error_sleep_time - retry + scheduler.join + rescue SystemExit, Interrupt + logger.info 'Shutting down' + scheduler.shutdown end private # @private @@ -101,32 +120,78 @@ else out[:instance].class.to_s end out[:method] = task[1] + + out[:task_id] = if out[:instance].respond_to?(:task_id) + out[:instance].send(:task_id).to_s + else + "#{out[:class_name]}.#{out[:method]}" + end + raise ArgumentError, 'Invalid task id' if out[:task_id].nil? || out[:task_id].empty? + out[:args] = task[2..-1].flatten out end # @private + # @param [Class] instance an instance of the task class + # @return [Hash<Symbol, String>] schedule parsed in parts: Schedule type and timing + def parse_schedule(instance) + valid_types = [:in, :at, :every, :interval, :cron] + out = {} + task_schedule = if instance.respond_to?(:schedule) + instance.send(:schedule) + else + schedule + end + + raise ArgumentError, 'Malformed schedule definition, should be [TYPE, DURATION]' if task_schedule.length < 2 + raise ArgumentError, 'Invalid schedule type' unless valid_types.include?(task_schedule[0].to_sym) + + out[:type] = task_schedule[0].to_sym + out[:schedule] = task_schedule[1] + out + end + + # @private # @param [Array<String, String, Array>] task to run # @return [String] output returned from task def run_task(task) parsed_task = parse_task(task) instance = parsed_task[:instance] + schedule = parse_schedule(instance) class_name = parsed_task[:class_name] method = parsed_task[:method] args = parsed_task[:args] - log_line = "Running #{class_name}.#{method}" - log_line += "(#{args})" unless args.empty? - logger.debug log_line + task_id = parsed_task[:task_id] - out = if args.empty? - instance.send(method.to_sym) - else - instance.send(method.to_sym, args) - end - logger.debug "Got: #{out}" - out + # Schedule the task + schedule_log_line = "#{task_id}: Scheduling job #{class_name}.#{method} as `:#{schedule[:type]}` type" + schedule_log_line += " with schedule: #{schedule[:schedule]}" + logger.debug schedule_log_line + + scheduler.send(schedule[:type], schedule[:schedule], :overlap => false, :job => true) do |job| + log_line = "#{task_id}: Running #{class_name}.#{method}" + log_line += "(#{args})" unless args.empty? + logger.debug log_line + + job[:error_sleep_time] = error_sleep_time + job[:logger] = logger + job[:task_id] = task_id + + out = if args.empty? + instance.send(method.to_sym) + else + instance.send(method.to_sym, args) + end + logger.debug "#{task_id}: Got: #{out}" + end + end + + # @return [Rufus::Scheduler] A scheduler instance + def scheduler + @scheduler ||= ::Rufus::Scheduler.new end end end