lib/rufus/sc/scheduler.rb in rufus-scheduler-2.0.11 vs lib/rufus/sc/scheduler.rb in rufus-scheduler-2.0.12

- old
+ new

@@ -103,10 +103,12 @@ @jobs = get_queue(:at, opts) @cron_jobs = get_queue(:cron, opts) @frequency = @options[:frequency] || 0.330 + + @mutexes = {} end # Instantiates and starts a new Rufus::Scheduler. # def self.start_new(opts={}) @@ -196,29 +198,46 @@ #-- # MISC #++ - # Feel free to override this method. The default implementation simply - # outputs the error message to STDOUT + # Determines if there is #log_exception, #handle_exception or #on_exception + # method. If yes, hands the exception to it, else defaults to outputting + # details to $stderr. # - def handle_exception(job, exception) + def do_handle_exception(job, exception) - if self.respond_to?(:log_exception) - # - # some kind of backward compatibility + begin - log_exception(exception) + [ :log_exception, :handle_exception, :on_exception ].each do |m| - else + next unless self.respond_to?(m) - puts '=' * 80 - puts "scheduler caught exception :" - puts exception - exception.backtrace.each { |l| puts l } - puts '=' * 80 + if method(m).arity == 1 + self.send(m, exception) + else + self.send(m, job, exception) + end + + return + # exception was handled successfully + end + + rescue Exception => e + + $stderr.puts '*' * 80 + $stderr.puts 'the exception handling method itself had an issue:' + $stderr.puts e + $stderr.puts *e.backtrace + $stderr.puts '*' * 80 end + + $stderr.puts '=' * 80 + $stderr.puts 'scheduler caught exception:' + $stderr.puts exception + $stderr.puts *exception.backtrace + $stderr.puts '=' * 80 end #-- # JOB LOOKUP #++ @@ -337,14 +356,17 @@ # Else, it will call the block in a dedicated thread. # # TODO : clarify, the blocking here blocks the whole scheduler, while # EmScheduler blocking triggers for the next tick. Not the same thing ... # - def trigger_job(blocking, &block) + def trigger_job(params, &block) - if blocking + if params[:blocking] block.call + elsif m = params[:mutex] + m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex) + Thread.new { m.synchronize { block.call } } else Thread.new { block.call } end end end @@ -469,17 +491,23 @@ protected # If 'blocking' is set to true, the block will get called at the # 'next_tick'. Else the block will get called via 'defer' (own thread). # - def trigger_job(blocking, &block) + def trigger_job(params, &block) - m = blocking ? :next_tick : :defer - # - # :next_tick monopolizes the EM - # :defer executes its block in another thread + # :next_tick monopolizes the EM + # :defer executes its block in another thread + # (if I read the doc carefully...) - EM.send(m) { block.call } + if params[:blocking] + EM.next_tick { block.call } + elsif m = params[:mutex] + m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex) + EM.defer { m.synchronize { block.call } } + else + EM.defer { block.call } + end end end # # This error is thrown when the :timeout attribute triggers