lib/bluth/worker.rb in bluth-0.6.1 vs lib/bluth/worker.rb in bluth-0.6.7
- old
+ new
@@ -25,12 +25,12 @@
module WorkerBase
def init(h=nil, u=nil, w=nil)
@host, @user, @wid, = h || Bluth.sysinfo.hostname, u || Bluth.sysinfo.user, w
- @pid_file ||= "/tmp/#{self.class.prefix}-#{wid}.pid"
- @log_file ||= "/tmp/#{self.class.prefix}-#{wid}.log"
+ @pid_file ||= "/tmp/#{name}.pid"
+ @log_file ||= "/tmp/#{name}.log"
@success ||= 0
@failure ||= 0
@problem ||= 0
end
@@ -39,11 +39,11 @@
@wid
end
# Used by daemonize as the process name (linux only)
def name
- "bluth-#{self.class.prefix}-#{wid}"
+ [self.class.prefix, wid].flatten.join '-'
end
#def rediskey(suffix=nil)
# self.class.rediskey index, suffix
#end
@@ -119,11 +119,11 @@
end
include WorkerBase
include Familia
include Logging
include Daemonizable
- prefix :worker
+ prefix [:bluth, :worker]
index [:host, :user, :wid]
field :host
field :user
field :wid
field :process_id => Integer
@@ -143,10 +143,11 @@
end
end
def run!
begin
+ Bluth.connect
self.class.runblock :onstart
find_gob
rescue => ex
msg = "#{ex.class}: #{ex.message}"
Familia.info msg
@@ -161,15 +162,16 @@
end
def run
begin
@process_id = $$
- save
- self.class.runblock :onstart
scheduler = Rufus::Scheduler.start_new
- Familia.info "Setting interval: #{Worker.interval} sec (poptimeout: #{Bluth.poptimeout})"
+ Bluth.connect
+ self.class.runblock :onstart
+ Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})"
Familia.reconnect_all! # Need to reconnect after daemonize
+ save
scheduler.every Worker.interval, :blocking => true do |task|
Familia.ld "#{$$} TICK @ #{Time.now.utc}"
sleep rand
find_gob task
end
@@ -182,11 +184,11 @@
self.class.runblock :onexit
destroy!
rescue Interrupt => ex
puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n")
Exiting...
- (You may need to wait up to #{Bluth.poptimeout} seconds
+ (You may need to wait up to #{Bluth.queuetimeout} seconds
for this worker to exit cleanly.)
EOS
# We reconnect to the queue in case we're currently
# waiting on a brpop (blocking pop) timeout.
self.class.runblock :onexit
@@ -250,12 +252,10 @@
#end
end
end
end
- # TODO: Refactor somehow. When this is subclassed
- # (eg BS::SchduleWorker) the self.object is not created.
class ScheduleWorker < Storable
include WorkerBase
include Familia
include Logging
include Daemonizable
@@ -273,11 +273,11 @@
@every << [interval, opts, blk]
end
@every
end
end
- prefix :scheduler
+ prefix [:bluth, :scheduler]
index [:host, :user, :wid]
field :host
field :user
field :wid
field :process_id => Integer
@@ -289,16 +289,19 @@
run
end
def run
begin
- raise Familia::Problem, "Only 1 scheduler at a time" if !ScheduleWorker.instances.empty?
EM.run {
@process_id = $$
srand(Bluth.salt.to_i(16) ** @process_id)
- ScheduleWorker.schedule = Rufus::Scheduler::EmScheduler.start_new
- save # persist and make note the scheduler is running
+ Bluth.connect
+ Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})"
+ Familia.reconnect_all! # Need to reconnect after daemonize
+ raise Familia::Problem, "Only 1 scheduler at a time" if !ScheduleWorker.instances.empty?
self.class.runblock :onstart
+ save # persist and make note the scheduler is running
+ ScheduleWorker.schedule = Rufus::Scheduler::EmScheduler.start_new
self.class.every.each do |args|
interval, opts, blk = *args
Familia.ld " scheduling every #{interval}: #{opts}"
ScheduleWorker.schedule.every interval, opts, &blk
end