lib/daemon_runner/client.rb in daemon_runner-0.3.0 vs lib/daemon_runner/client.rb in daemon_runner-0.4.0

- old
+ new

@@ -15,19 +15,26 @@ # Set error handling # @param [Rufus::Scheduler::Job] job job that raised the error # @param [RuntimeError] error the body of the error def scheduler.on_error(job, error) error_sleep_time = job[:error_sleep_time] + on_error_release_lock = job[:on_error_release_lock] logger = job[:logger] task_id = job[:task_id] + mutex = @mutexes[task_id] + logger.error "#{task_id}: #{error}" + logger.debug "#{task_id}: #{error.backtrace.join("\n")}" logger.debug "#{task_id}: Suspending #{task_id} for #{error_sleep_time} seconds" - job.pause + + # Unlock the job mutex if the job owns it and on_error_release_lock is true + mutex.unlock if on_error_release_lock && mutex.owned? + sleep error_sleep_time + logger.debug "#{task_id}: Resuming #{task_id}" - job.resume end end # Hook to allow initial setup tasks before running tasks. # @abstract Override {#wait} to pause before starting. @@ -56,10 +63,20 @@ # The default type is an `interval` which trigger, execute and then trigger again after # the interval has elapsed. [:interval, loop_sleep_time] end + # @return [Boolean] Whether to release a mutex lock if an error occurs in a task. + def on_error_release_lock + return @on_error_release_lock unless @on_error_release_lock.nil? + @on_error_release_lock = if options[:on_error_release_lock].nil? + true + else + options[:on_error_release_lock] + end + end + # @return [Fixnum] Number of seconds to sleep between loop interactions. def loop_sleep_time return @loop_sleep_time unless @loop_sleep_time.nil? @loop_sleep_time = if options[:loop_sleep_time].nil? 5 @@ -121,15 +138,18 @@ out[:instance].class.to_s end out[:method] = task[1] - out[:task_id] = if out[:instance].respond_to?(:task_id) - out[:instance].send(:task_id).to_s + if out[:instance].instance_variable_defined?(:@task_id) + out[:task_id] = out[:instance].instance_variable_get(:@task_id) + elsif out[:instance].respond_to?(:task_id) + out[:task_id] = out[:instance].send(:task_id).to_s else - "#{out[:class_name]}.#{out[:method]}" + out[:task_id] = "#{out[:class_name]}.#{out[:method]}" end + raise ArgumentError, 'Invalid task id' if out[:task_id].nil? || out[:task_id].empty? out[:args] = task[2..-1].flatten out end @@ -138,21 +158,24 @@ # @param [Class] instance an instance of the task class # @return [Hash<Symbol, String>] schedule parsed in parts: Schedule type and timing def parse_schedule(instance) valid_types = [:in, :at, :every, :interval, :cron] out = {} - task_schedule = if instance.respond_to?(:schedule) - instance.send(:schedule) + if instance.instance_variable_defined?(:@schedule) + task_schedule = instance.instance_variable_get(:@schedule) + elsif instance.respond_to?(:schedule) + task_schedule = instance.send(:schedule) else - schedule + task_schedule = schedule end raise ArgumentError, 'Malformed schedule definition, should be [TYPE, DURATION]' if task_schedule.length < 2 raise ArgumentError, 'Invalid schedule type' unless valid_types.include?(task_schedule[0].to_sym) out[:type] = task_schedule[0].to_sym out[:schedule] = task_schedule[1] + out[:extra_opts] = task_schedule[2] if task_schedule.length > 2 out end # @private # @param [Array<String, String, Array>] task to run @@ -169,15 +192,19 @@ # Schedule the task schedule_log_line = "#{task_id}: Scheduling job #{class_name}.#{method} as `:#{schedule[:type]}` type" schedule_log_line += " with schedule: #{schedule[:schedule]}" logger.debug schedule_log_line - scheduler.send(schedule[:type], schedule[:schedule], :overlap => false, :job => true) do |job| + opts = { :overlap => false, :job => true, :mutex => task_id } + opts.merge!(schedule[:extra_opts]) if schedule.key?(:extra_opts) + + scheduler.send(schedule[:type], schedule[:schedule], opts) do |job| log_line = "#{task_id}: Running #{class_name}.#{method}" log_line += "(#{args})" unless args.empty? logger.debug log_line job[:error_sleep_time] = error_sleep_time + job[:on_error_release_lock] = on_error_release_lock job[:logger] = logger job[:task_id] = task_id out = if args.empty? instance.send(method.to_sym)