lib/daemon_runner/client.rb in daemon_runner-0.3.0 vs lib/daemon_runner/client.rb in daemon_runner-0.4.0
- old
+ new
@@ -15,19 +15,26 @@
# Set error handling
# @param [Rufus::Scheduler::Job] job job that raised the error
# @param [RuntimeError] error the body of the error
def scheduler.on_error(job, error)
error_sleep_time = job[:error_sleep_time]
+ on_error_release_lock = job[:on_error_release_lock]
logger = job[:logger]
task_id = job[:task_id]
+ mutex = @mutexes[task_id]
+
logger.error "#{task_id}: #{error}"
+ logger.debug "#{task_id}: #{error.backtrace.join("\n")}"
logger.debug "#{task_id}: Suspending #{task_id} for #{error_sleep_time} seconds"
- job.pause
+
+ # Unlock the job mutex if the job owns it and on_error_release_lock is true
+ mutex.unlock if on_error_release_lock && mutex.owned?
+
sleep error_sleep_time
+
logger.debug "#{task_id}: Resuming #{task_id}"
- job.resume
end
end
# Hook to allow initial setup tasks before running tasks.
# @abstract Override {#wait} to pause before starting.
@@ -56,10 +63,20 @@
# The default type is an `interval` which trigger, execute and then trigger again after
# the interval has elapsed.
[:interval, loop_sleep_time]
end
+ # @return [Boolean] Whether to release a mutex lock if an error occurs in a task.
+ def on_error_release_lock
+ return @on_error_release_lock unless @on_error_release_lock.nil?
+ @on_error_release_lock = if options[:on_error_release_lock].nil?
+ true
+ else
+ options[:on_error_release_lock]
+ end
+ end
+
# @return [Fixnum] Number of seconds to sleep between loop interactions.
def loop_sleep_time
return @loop_sleep_time unless @loop_sleep_time.nil?
@loop_sleep_time = if options[:loop_sleep_time].nil?
5
@@ -121,15 +138,18 @@
out[:instance].class.to_s
end
out[:method] = task[1]
- out[:task_id] = if out[:instance].respond_to?(:task_id)
- out[:instance].send(:task_id).to_s
+ if out[:instance].instance_variable_defined?(:@task_id)
+ out[:task_id] = out[:instance].instance_variable_get(:@task_id)
+ elsif out[:instance].respond_to?(:task_id)
+ out[:task_id] = out[:instance].send(:task_id).to_s
else
- "#{out[:class_name]}.#{out[:method]}"
+ out[:task_id] = "#{out[:class_name]}.#{out[:method]}"
end
+
raise ArgumentError, 'Invalid task id' if out[:task_id].nil? || out[:task_id].empty?
out[:args] = task[2..-1].flatten
out
end
@@ -138,21 +158,24 @@
# @param [Class] instance an instance of the task class
# @return [Hash<Symbol, String>] schedule parsed in parts: Schedule type and timing
def parse_schedule(instance)
valid_types = [:in, :at, :every, :interval, :cron]
out = {}
- task_schedule = if instance.respond_to?(:schedule)
- instance.send(:schedule)
+ if instance.instance_variable_defined?(:@schedule)
+ task_schedule = instance.instance_variable_get(:@schedule)
+ elsif instance.respond_to?(:schedule)
+ task_schedule = instance.send(:schedule)
else
- schedule
+ task_schedule = schedule
end
raise ArgumentError, 'Malformed schedule definition, should be [TYPE, DURATION]' if task_schedule.length < 2
raise ArgumentError, 'Invalid schedule type' unless valid_types.include?(task_schedule[0].to_sym)
out[:type] = task_schedule[0].to_sym
out[:schedule] = task_schedule[1]
+ out[:extra_opts] = task_schedule[2] if task_schedule.length > 2
out
end
# @private
# @param [Array<String, String, Array>] task to run
@@ -169,15 +192,19 @@
# Schedule the task
schedule_log_line = "#{task_id}: Scheduling job #{class_name}.#{method} as `:#{schedule[:type]}` type"
schedule_log_line += " with schedule: #{schedule[:schedule]}"
logger.debug schedule_log_line
- scheduler.send(schedule[:type], schedule[:schedule], :overlap => false, :job => true) do |job|
+ opts = { :overlap => false, :job => true, :mutex => task_id }
+ opts.merge!(schedule[:extra_opts]) if schedule.key?(:extra_opts)
+
+ scheduler.send(schedule[:type], schedule[:schedule], opts) do |job|
log_line = "#{task_id}: Running #{class_name}.#{method}"
log_line += "(#{args})" unless args.empty?
logger.debug log_line
job[:error_sleep_time] = error_sleep_time
+ job[:on_error_release_lock] = on_error_release_lock
job[:logger] = logger
job[:task_id] = task_id
out = if args.empty?
instance.send(method.to_sym)