lib/bluth/worker.rb in bluth-0.7.0 vs lib/bluth/worker.rb in bluth-0.7.2
- old
+ new
@@ -76,19 +76,31 @@
module ClassMethods
def from_redis(wid)
me = new nil, nil, wid
super(me.index)
end
+ def reconnect! name=nil
+ Familia.info "Reconnecting #{name}"
+ attempt = 0
+ success = false
+ reconnect_tries.times {
+ attempt += 1
+ Familia.info " reconnecting in #{reconnect_delay} (\##{attempt} of #{reconnect_tries})..."
+ success = Bluth.reconnect! reconnect_delay
+ break if success
+ }
+ success
+ end
def run!(*args)
me = new
- Familia.info "Created: #{me.rediskey}"
+ Familia.info "Created: #{me.index}"
me.run!
me
end
def run(*args)
me = new
- Familia.info "Created: #{me.rediskey}"
+ Familia.info "Created: #{me.index}"
me.run
me
end
def kill(pid_file)
self.class.runblock :onexit
@@ -97,10 +109,14 @@
end
def onstart &blk
@onstart = blk unless blk.nil?
@onstart
end
+ def onerror &blk
+ @onerror = blk unless blk.nil?
+ @onerror
+ end
def onexit &blk
@onexit = blk unless blk.nil?
@onexit
end
# A convenience method for calling onstart/onexit blocks
@@ -111,13 +127,15 @@
end
end
end
class Worker < Storable
- @interval = 2 #.seconds
+ @interval = 1 #.seconds
+ @reconnect_delay = 15 #.seconds
+ @reconnect_tries = 20
class << self
- attr_accessor :interval
+ attr_accessor :interval, :reconnect_delay, :reconnect_tries
end
include WorkerBase
include Familia
include Logging
include Daemonizable
@@ -141,65 +159,103 @@
self.instance_variable_set '@current_job', ''
update_time! # calls save
end
end
+ def carefully
+ begin
+ yield
+ rescue Errno::ECONNREFUSED => ex
+
+ unless Bluth::Worker.reconnect! self.index
+ self.class.onerror.call ex, self if self.class.onerror
+ Familia.info "Reconnect failed :["
+ end
+
+ rescue => ex
+ if self.class.onerror
+ self.class.onerror.call ex, self
+ else
+ Familia.info ex.message
+ Familia.ld ex.backtrace
+ problem!
+ end
+ #if problem > 5
+ # ## TODO: SEND EMAIL
+ # task.unschedule unless task.nil? # Kill this worker b/c something is clearly wrong
+ # destroy!
+ # EM.stop
+ # exit 1
+ #end
+ end
+ end
+
def run!
begin
Bluth.connect
self.class.runblock :onstart
- find_gob
- rescue => ex
- msg = "#{ex.class}: #{ex.message}"
- Familia.info msg
- Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug?
- self.class.runblock :onexit
- destroy!
+ carefully do
+ find_gob
+ end
rescue Interrupt => ex
puts $/, "Exiting..."
self.class.runblock :onexit
destroy!
end
end
def run
begin
@process_id = $$
- scheduler = Rufus::Scheduler.start_new
+ @scheduler = Rufus::Scheduler.start_new
Bluth.connect
self.class.runblock :onstart
Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})"
Familia.reconnect_all! # Need to reconnect after daemonize
save
- scheduler.every Worker.interval, :blocking => true do |task|
- Familia.ld "#{$$} TICK @ #{Time.now.utc}" if Familia.debug?
- sleep rand
- find_gob task
+ Signal.trap("USR1") do
+ Familia.debug = (Familia.debug == false)
+ Familia.info "Debugging is #{Familia.debug ? 'enabled' : 'disabled'}"
end
- scheduler.join
-
- rescue => ex
- msg = "#{ex.class}: #{ex.message}"
- Familia.info msg
- Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug?
- self.class.runblock :onexit
- destroy!
+ @usr2_reduce = true
+ Signal.trap("USR2") do
+ @usr2_reduce = false if Bluth.queuetimeout <= 2
+ @usr2_reduce = true if Bluth.queuetimeout >= 60
+ if @usr2_reduce
+ #Worker.interval /= 2.0
+ Bluth.queuetimeout /= 2
+ else
+ #Worker.interval *= 2.0
+ Bluth.queuetimeout *= 2
+ end
+ Familia.info "Set intervals: #{Worker.interval} sec / #{Bluth.queuetimeout} sec"
+ end
+ ## TODO: on_the_minute = Time.at(BS.quantize(Stella.now, 1.minute)+1.minute).utc ## first_at
+ ## @option.ontheminute
+ @task = @scheduler.every Worker.interval, :blocking => true, :first_in => '2s' do |task|
+ carefully do
+ Familia.ld "#{$$} TICK @ #{Time.now.utc}" if Familia.debug?
+ find_gob task
+ end
+ end
+ @scheduler.join
+
rescue Interrupt => ex
puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n")
Exiting...
(You may need to wait up to #{Bluth.queuetimeout} seconds
for this worker to exit cleanly.)
EOS
# We reconnect to the queue in case we're currently
# waiting on a brpop (blocking pop) timeout.
- self.class.runblock :onexit
destroy!
+ ensure
+ self.class.runblock :onexit
end
end
-
private
# DO NOT call return from this method
def find_gob(task=nil)
begin
@@ -220,11 +276,11 @@
job.cpu = [tms.utime,tms.stime,tms.real]
job.save
job.success!
self.success!
end
- end
+ end
rescue Bluth::Shutdown => ex
msg = "Shutdown requested: #{ex.message}"
job.success! msg
Familia.info msg
task.unschedule
@@ -237,21 +293,14 @@
rescue Bluth::Buster => ex
Familia.info ex.message
job.failure! ex.message
self.failure!
rescue => ex
- Familia.info ex.message
Familia.info ex.backtrace
job.retry! "#{ex.class}: #{ex.message}" if job
- problem!
- #if problem > 5
- # ## TODO: SEND EMAIL
- # task.unschedule unless task.nil? # Kill this worker b/c something is clearly wrong
- # destroy!
- # EM.stop
- # exit 1
- #end
+ raise ex
end
+
end
end
class ScheduleWorker < Storable
include WorkerBase