lib/bluth/worker.rb in bluth-0.7.0 vs lib/bluth/worker.rb in bluth-0.7.2

- old
+ new

@@ -76,19 +76,31 @@ module ClassMethods def from_redis(wid) me = new nil, nil, wid super(me.index) end + def reconnect! name=nil + Familia.info "Reconnecting #{name}" + attempt = 0 + success = false + reconnect_tries.times { + attempt += 1 + Familia.info " reconnecting in #{reconnect_delay} (\##{attempt} of #{reconnect_tries})..." + success = Bluth.reconnect! reconnect_delay + break if success + } + success + end def run!(*args) me = new - Familia.info "Created: #{me.rediskey}" + Familia.info "Created: #{me.index}" me.run! me end def run(*args) me = new - Familia.info "Created: #{me.rediskey}" + Familia.info "Created: #{me.index}" me.run me end def kill(pid_file) self.class.runblock :onexit @@ -97,10 +109,14 @@ end def onstart &blk @onstart = blk unless blk.nil? @onstart end + def onerror &blk + @onerror = blk unless blk.nil? + @onerror + end def onexit &blk @onexit = blk unless blk.nil? @onexit end # A convenience method for calling onstart/onexit blocks @@ -111,13 +127,15 @@ end end end class Worker < Storable - @interval = 2 #.seconds + @interval = 1 #.seconds + @reconnect_delay = 15 #.seconds + @reconnect_tries = 20 class << self - attr_accessor :interval + attr_accessor :interval, :reconnect_delay, :reconnect_tries end include WorkerBase include Familia include Logging include Daemonizable @@ -141,65 +159,103 @@ self.instance_variable_set '@current_job', '' update_time! # calls save end end + def carefully + begin + yield + rescue Errno::ECONNREFUSED => ex + + unless Bluth::Worker.reconnect! self.index + self.class.onerror.call ex, self if self.class.onerror + Familia.info "Reconnect failed :[" + end + + rescue => ex + if self.class.onerror + self.class.onerror.call ex, self + else + Familia.info ex.message + Familia.ld ex.backtrace + problem! + end + #if problem > 5 + # ## TODO: SEND EMAIL + # task.unschedule unless task.nil? # Kill this worker b/c something is clearly wrong + # destroy! + # EM.stop + # exit 1 + #end + end + end + def run! begin Bluth.connect self.class.runblock :onstart - find_gob - rescue => ex - msg = "#{ex.class}: #{ex.message}" - Familia.info msg - Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug? - self.class.runblock :onexit - destroy! + carefully do + find_gob + end rescue Interrupt => ex puts $/, "Exiting..." self.class.runblock :onexit destroy! end end def run begin @process_id = $$ - scheduler = Rufus::Scheduler.start_new + @scheduler = Rufus::Scheduler.start_new Bluth.connect self.class.runblock :onstart Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})" Familia.reconnect_all! # Need to reconnect after daemonize save - scheduler.every Worker.interval, :blocking => true do |task| - Familia.ld "#{$$} TICK @ #{Time.now.utc}" if Familia.debug? - sleep rand - find_gob task + Signal.trap("USR1") do + Familia.debug = (Familia.debug == false) + Familia.info "Debugging is #{Familia.debug ? 'enabled' : 'disabled'}" end - scheduler.join - - rescue => ex - msg = "#{ex.class}: #{ex.message}" - Familia.info msg - Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug? - self.class.runblock :onexit - destroy! + @usr2_reduce = true + Signal.trap("USR2") do + @usr2_reduce = false if Bluth.queuetimeout <= 2 + @usr2_reduce = true if Bluth.queuetimeout >= 60 + if @usr2_reduce + #Worker.interval /= 2.0 + Bluth.queuetimeout /= 2 + else + #Worker.interval *= 2.0 + Bluth.queuetimeout *= 2 + end + Familia.info "Set intervals: #{Worker.interval} sec / #{Bluth.queuetimeout} sec" + end + ## TODO: on_the_minute = Time.at(BS.quantize(Stella.now, 1.minute)+1.minute).utc ## first_at + ## @option.ontheminute + @task = @scheduler.every Worker.interval, :blocking => true, :first_in => '2s' do |task| + carefully do + Familia.ld "#{$$} TICK @ #{Time.now.utc}" if Familia.debug? + find_gob task + end + end + @scheduler.join + rescue Interrupt => ex puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n") Exiting... (You may need to wait up to #{Bluth.queuetimeout} seconds for this worker to exit cleanly.) EOS # We reconnect to the queue in case we're currently # waiting on a brpop (blocking pop) timeout. - self.class.runblock :onexit destroy! + ensure + self.class.runblock :onexit end end - private # DO NOT call return from this method def find_gob(task=nil) begin @@ -220,11 +276,11 @@ job.cpu = [tms.utime,tms.stime,tms.real] job.save job.success! self.success! end - end + end rescue Bluth::Shutdown => ex msg = "Shutdown requested: #{ex.message}" job.success! msg Familia.info msg task.unschedule @@ -237,21 +293,14 @@ rescue Bluth::Buster => ex Familia.info ex.message job.failure! ex.message self.failure! rescue => ex - Familia.info ex.message Familia.info ex.backtrace job.retry! "#{ex.class}: #{ex.message}" if job - problem! - #if problem > 5 - # ## TODO: SEND EMAIL - # task.unschedule unless task.nil? # Kill this worker b/c something is clearly wrong - # destroy! - # EM.stop - # exit 1 - #end + raise ex end + end end class ScheduleWorker < Storable include WorkerBase