lib/rufus/sc/scheduler.rb in rufus-scheduler-2.0.11 vs lib/rufus/sc/scheduler.rb in rufus-scheduler-2.0.12
- old
+ new
@@ -103,10 +103,12 @@
@jobs = get_queue(:at, opts)
@cron_jobs = get_queue(:cron, opts)
@frequency = @options[:frequency] || 0.330
+
+ @mutexes = {}
end
# Instantiates and starts a new Rufus::Scheduler.
#
def self.start_new(opts={})
@@ -196,29 +198,46 @@
#--
# MISC
#++
- # Feel free to override this method. The default implementation simply
- # outputs the error message to STDOUT
+ # Determines if there is #log_exception, #handle_exception or #on_exception
+ # method. If yes, hands the exception to it, else defaults to outputting
+ # details to $stderr.
#
- def handle_exception(job, exception)
+ def do_handle_exception(job, exception)
- if self.respond_to?(:log_exception)
- #
- # some kind of backward compatibility
+ begin
- log_exception(exception)
+ [ :log_exception, :handle_exception, :on_exception ].each do |m|
- else
+ next unless self.respond_to?(m)
- puts '=' * 80
- puts "scheduler caught exception :"
- puts exception
- exception.backtrace.each { |l| puts l }
- puts '=' * 80
+ if method(m).arity == 1
+ self.send(m, exception)
+ else
+ self.send(m, job, exception)
+ end
+
+ return
+ # exception was handled successfully
+ end
+
+ rescue Exception => e
+
+ $stderr.puts '*' * 80
+ $stderr.puts 'the exception handling method itself had an issue:'
+ $stderr.puts e
+ $stderr.puts *e.backtrace
+ $stderr.puts '*' * 80
end
+
+ $stderr.puts '=' * 80
+ $stderr.puts 'scheduler caught exception:'
+ $stderr.puts exception
+ $stderr.puts *exception.backtrace
+ $stderr.puts '=' * 80
end
#--
# JOB LOOKUP
#++
@@ -337,14 +356,17 @@
# Else, it will call the block in a dedicated thread.
#
# TODO : clarify, the blocking here blocks the whole scheduler, while
# EmScheduler blocking triggers for the next tick. Not the same thing ...
#
- def trigger_job(blocking, &block)
+ def trigger_job(params, &block)
- if blocking
+ if params[:blocking]
block.call
+ elsif m = params[:mutex]
+ m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex)
+ Thread.new { m.synchronize { block.call } }
else
Thread.new { block.call }
end
end
end
@@ -469,17 +491,23 @@
protected
# If 'blocking' is set to true, the block will get called at the
# 'next_tick'. Else the block will get called via 'defer' (own thread).
#
- def trigger_job(blocking, &block)
+ def trigger_job(params, &block)
- m = blocking ? :next_tick : :defer
- #
- # :next_tick monopolizes the EM
- # :defer executes its block in another thread
+ # :next_tick monopolizes the EM
+ # :defer executes its block in another thread
+ # (if I read the doc carefully...)
- EM.send(m) { block.call }
+ if params[:blocking]
+ EM.next_tick { block.call }
+ elsif m = params[:mutex]
+ m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex)
+ EM.defer { m.synchronize { block.call } }
+ else
+ EM.defer { block.call }
+ end
end
end
#
# This error is thrown when the :timeout attribute triggers