lib/bluth/worker.rb in bluth-0.6.1 vs lib/bluth/worker.rb in bluth-0.6.7

- old
+ new

@@ -25,12 +25,12 @@ module WorkerBase def init(h=nil, u=nil, w=nil) @host, @user, @wid, = h || Bluth.sysinfo.hostname, u || Bluth.sysinfo.user, w - @pid_file ||= "/tmp/#{self.class.prefix}-#{wid}.pid" - @log_file ||= "/tmp/#{self.class.prefix}-#{wid}.log" + @pid_file ||= "/tmp/#{name}.pid" + @log_file ||= "/tmp/#{name}.log" @success ||= 0 @failure ||= 0 @problem ||= 0 end @@ -39,11 +39,11 @@ @wid end # Used by daemonize as the process name (linux only) def name - "bluth-#{self.class.prefix}-#{wid}" + [self.class.prefix, wid].flatten.join '-' end #def rediskey(suffix=nil) # self.class.rediskey index, suffix #end @@ -119,11 +119,11 @@ end include WorkerBase include Familia include Logging include Daemonizable - prefix :worker + prefix [:bluth, :worker] index [:host, :user, :wid] field :host field :user field :wid field :process_id => Integer @@ -143,10 +143,11 @@ end end def run! begin + Bluth.connect self.class.runblock :onstart find_gob rescue => ex msg = "#{ex.class}: #{ex.message}" Familia.info msg @@ -161,15 +162,16 @@ end def run begin @process_id = $$ - save - self.class.runblock :onstart scheduler = Rufus::Scheduler.start_new - Familia.info "Setting interval: #{Worker.interval} sec (poptimeout: #{Bluth.poptimeout})" + Bluth.connect + self.class.runblock :onstart + Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})" Familia.reconnect_all! # Need to reconnect after daemonize + save scheduler.every Worker.interval, :blocking => true do |task| Familia.ld "#{$$} TICK @ #{Time.now.utc}" sleep rand find_gob task end @@ -182,11 +184,11 @@ self.class.runblock :onexit destroy! rescue Interrupt => ex puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n") Exiting... - (You may need to wait up to #{Bluth.poptimeout} seconds + (You may need to wait up to #{Bluth.queuetimeout} seconds for this worker to exit cleanly.) EOS # We reconnect to the queue in case we're currently # waiting on a brpop (blocking pop) timeout. self.class.runblock :onexit @@ -250,12 +252,10 @@ #end end end end - # TODO: Refactor somehow. When this is subclassed - # (eg BS::SchduleWorker) the self.object is not created. class ScheduleWorker < Storable include WorkerBase include Familia include Logging include Daemonizable @@ -273,11 +273,11 @@ @every << [interval, opts, blk] end @every end end - prefix :scheduler + prefix [:bluth, :scheduler] index [:host, :user, :wid] field :host field :user field :wid field :process_id => Integer @@ -289,16 +289,19 @@ run end def run begin - raise Familia::Problem, "Only 1 scheduler at a time" if !ScheduleWorker.instances.empty? EM.run { @process_id = $$ srand(Bluth.salt.to_i(16) ** @process_id) - ScheduleWorker.schedule = Rufus::Scheduler::EmScheduler.start_new - save # persist and make note the scheduler is running + Bluth.connect + Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})" + Familia.reconnect_all! # Need to reconnect after daemonize + raise Familia::Problem, "Only 1 scheduler at a time" if !ScheduleWorker.instances.empty? self.class.runblock :onstart + save # persist and make note the scheduler is running + ScheduleWorker.schedule = Rufus::Scheduler::EmScheduler.start_new self.class.every.each do |args| interval, opts, blk = *args Familia.ld " scheduling every #{interval}: #{opts}" ScheduleWorker.schedule.every interval, opts, &blk end