lib/bluth/worker.rb in bluth-0.5.3 vs lib/bluth/worker.rb in bluth-0.6.0

- old
+ new

@@ -1,54 +1,67 @@ +require 'benchmark' require 'eventmachine' require 'rufus/scheduler' require 'daemonizing' -require 'timeout' +class Rufus::Scheduler::SchedulerCore + # See lib/rufus/sc/scheduler.rb + def handle_exception(task, exception) + case exception + when SystemExit + exit + else + Familia.info exception.message + Familia.info exception.backtrace + task.unschedule + end + end +end + module Bluth @salt = rand.gibbler.shorten(10).freeze class << self attr_reader :salt end module WorkerBase - - def id - @id ||= [host, user, rand, Time.now].gibbler.short + + def init(h=nil, u=nil, w=nil) + @host, @user, @wid, = h || Bluth.sysinfo.hostname, u || Bluth.sysinfo.user, w + @pid_file ||= "/tmp/#{self.class.prefix}-#{wid}.pid" + @log_file ||= "/tmp/#{self.class.prefix}-#{wid}.log" + @success ||= 0 + @failure ||= 0 + @problem ||= 0 end - def longid - [host, user, id].join('-') + def wid + @wid ||= [host, user, rand, Time.now.to_f].gibbler.short + @wid end # Used by daemonize as the process name (linux only) def name - "bs-#{self.class.prefix}-#{id}" + "bluth-#{self.class.prefix}-#{wid}" end - def key(suffix=nil) - self.class.key longid, suffix - end + #def rediskey(suffix=nil) + # self.class.rediskey index, suffix + #end - def initialize - @host, @user = Bluth.sysinfo.hostname, Bluth.sysinfo.user - @pid_file ||= "/tmp/#{self.class.prefix}-#{id}.pid" - @log_file ||= "/tmp/#{self.class.prefix}-#{id}.log" - @success, @failure, @problem = 0, 0, 0 - end - def current_job Gibbler::Digest.new(@current_job || '') end def kill(force=false) if force || host == Bluth.sysinfo.hostname - STDERR.puts "Destroying #{self.index} (this machine is: #{Bluth.sysinfo.hostname}; worker is: #{host})" + Familia.info "Destroying #{self.index} (this machine is: #{Bluth.sysinfo.hostname}; worker is: #{host})" Worker.kill self.pid_file if File.exists?(self.pid_file) rescue Errno::ESRCH File.delete self.log_file if File.exists?(self.log_file) destroy! else - STDERR.puts "Worker #{self.index} not running on #{Bluth.sysinfo.hostname}" + Familia.info "Worker #{self.index} not running on #{Bluth.sysinfo.hostname}" end end def working! gobid @current_job = gobid @@ -60,153 +73,147 @@ obj.extend WorkerBase::ClassMethods end module ClassMethods def from_redis(wid) - me = new - me.id = wid - super(me.longid) + me = new nil, nil, wid + super(me.index) end - def run!(*args) me = new - Familia.info "Created: #{me.key}" + Familia.info "Created: #{me.rediskey}" me.run! me end - def run(*args) me = new - Familia.info "Created: #{me.key}" + Familia.info "Created: #{me.rediskey}" me.run me end - def kill(pid_file) + self.class.runblock :onexit pid = read_pid_file pid_file super(pid_file, 10) end - - + def onstart &blk + @onstart = blk unless blk.nil? + @onstart + end + def onexit &blk + @onexit = blk unless blk.nil? + @onexit + end + # A convenience method for calling onstart/onexit blocks + def runblock meth + blk = self.send(meth) + return if blk.nil? + instance_eval &blk + end end - end class Worker < Storable - include WorkerBase - @interval = 2.seconds + @interval = 2 #.seconds class << self attr_accessor :interval end + include WorkerBase include Familia include Logging include Daemonizable prefix :worker - index :id + index [:host, :user, :wid] field :host field :user - field :id + field :wid field :process_id => Integer field :pid_file field :log_file field :current_job field :success => Integer field :failure => Integer field :problem => Integer include Familia::Stamps - def success! - @success += 1 - @current_job = "" - update_time - save + [:success, :failure, :problem].each do |name| + define_method "#{name}!" do + v = self.send(name) + 1 + self.send :"#{name}=", v + self.instance_variable_set '@current_job', '' + update_time! # calls save + end end - def failure! - @failure += 1 - @current_job = "" - update_time - save - end - def problem! - @problem += 1 - @current_job = "" - update_time - save - end def run! begin + self.class.runblock :onstart find_gob rescue => ex msg = "#{ex.class}: #{ex.message}" - STDERR.puts msg - Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug? + Familia.info msg + Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug? + self.class.runblock :onexit destroy! rescue Interrupt => ex puts $/, "Exiting..." + self.class.runblock :onexit destroy! end end def run begin @process_id = $$ save - + self.class.runblock :onstart scheduler = Rufus::Scheduler.start_new Familia.info "Setting interval: #{Worker.interval} sec (poptimeout: #{Bluth.poptimeout})" Familia.reconnect_all! # Need to reconnect after daemonize - ## TODO: Works but needs to restart scheduler - ##Signal.trap("USR1") do - ## Worker.interval += 1 - ## Familia.info "Setting interval: #{Worker.interval} sec" - ##end - ##Signal.trap("USR2") do - ## Worker.interval -= 1 - ## Familia.info "Setting interval: #{Worker.interval}" - ##end scheduler.every Worker.interval, :blocking => true do |task| Familia.ld "#{$$} TICK @ #{Time.now.utc}" sleep rand find_gob task end scheduler.join rescue => ex msg = "#{ex.class}: #{ex.message}" - STDERR.puts msg - Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug? + Familia.info msg + Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug? + self.class.runblock :onexit destroy! rescue Interrupt => ex puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n") Exiting... (You may need to wait up to #{Bluth.poptimeout} seconds for this worker to exit cleanly.) EOS # We reconnect to the queue in case we're currently # waiting on a brpop (blocking pop) timeout. + self.class.runblock :onexit destroy! end end private - require 'benchmark' + # DO NOT return from this method def find_gob(task=nil) begin job = Bluth.pop unless job.nil? - job.wid = self.id + job.wid = self.wid if job.delayed? job.attempts = 0 job.retry! elsif !job.attempt? job.failure! "Too many attempts" else job.stime = Time.now.utc.to_i - self.working! job.id + self.working! job.jobid tms = Benchmark.measure do job.perform end job.cpu = [tms.utime.fineround(3),tms.stime.fineround(3),tms.real.fineround(3)] job.save @@ -241,116 +248,78 @@ # EM.stop # exit 1 #end end end - end + # TODO: Refactor somehow. When this is subclassed + # (eg BS::SchduleWorker) the self.object is not created. class ScheduleWorker < Storable include WorkerBase + include Familia + include Logging + include Daemonizable @interval = 20 @timeout = 60 #not working + @every = [] class << self - attr_accessor :interval, :timeout + attr_accessor :interval, :timeout, :schedule def interval(v=nil) @interval = v unless v.nil? @interval end + def every interval=nil, opts={}, &blk + unless interval.nil? + @every << [interval, opts, blk] + end + @every + end end - include Familia - include Logging - include Daemonizable prefix :scheduler - index :id + index [:host, :user, :wid] field :host field :user - field :id + field :wid field :process_id => Integer field :pid_file field :log_file - field :scheduled => Integer - field :monitored => Integer - field :timeouts => Integer include Familia::Stamps - attr_reader :schedule - attr_reader :monitors - - def scheduled!(count=1) - @scheduled ||= 0 - @scheduled += count - update_time - save - end - def monitored!(count=1) - @monitored ||= 0 - @monitored += count - update_time - save - end - def timeout!(count=1) - @timeouts ||= 0 - @timeouts += count - update_time - save - end + def run! run end + def run begin - raise Familia::Problem, "Only 1 scheduler at a time" if ScheduleWorker.any? - + raise Familia::Problem, "Only 1 scheduler at a time" if !ScheduleWorker.instances.empty? EM.run { @process_id = $$ srand(Bluth.salt.to_i(16) ** @process_id) - @schedule = Rufus::Scheduler::EmScheduler.start_new + ScheduleWorker.schedule = Rufus::Scheduler::EmScheduler.start_new save # persist and make note the scheduler is running - prepare - @schedule.every self.class.interval, :tags => :keeper do |keeper_task| - begin - scheduled_work(keeper_task) - rescue => ex - msg = "#{ex.class}: #{ex.message}" - STDERR.puts msg - STDERR.puts ex.backtrace - Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug? - end - sleep rand # prevent thrashing + self.class.runblock :onstart + self.class.every.each do |args| + interval, opts, blk = *args + Familia.ld " scheduling every #{interval}: #{opts}" + ScheduleWorker.schedule.every interval, opts, &blk end } rescue => ex msg = "#{ex.class}: #{ex.message}" puts msg - STDERR.puts ex.backtrace - Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug? + Familia.info ex.backtrace + Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug? + self.class.runblock :onexit destroy! rescue Interrupt => ex puts $/, "Exiting..." + self.class.runblock :onexit destroy! end end - protected - - def prepare - end - - def scheduled_work(keeper) - STDOUT.puts "Come on!" - end - end + Bluth.scheduler = Bluth::ScheduleWorker end -class Rufus::Scheduler::SchedulerCore - # See lib/rufus/sc/scheduler.rb - def handle_exception(job, exception) - case exception - when SystemExit - exit - else - super - end - end -end