lib/bluth/worker.rb in bluth-0.5.3 vs lib/bluth/worker.rb in bluth-0.6.0
- old
+ new
@@ -1,54 +1,67 @@
+require 'benchmark'
require 'eventmachine'
require 'rufus/scheduler'
require 'daemonizing'
-require 'timeout'
+class Rufus::Scheduler::SchedulerCore
+ # See lib/rufus/sc/scheduler.rb
+ def handle_exception(task, exception)
+ case exception
+ when SystemExit
+ exit
+ else
+ Familia.info exception.message
+ Familia.info exception.backtrace
+ task.unschedule
+ end
+ end
+end
+
module Bluth
@salt = rand.gibbler.shorten(10).freeze
class << self
attr_reader :salt
end
module WorkerBase
-
- def id
- @id ||= [host, user, rand, Time.now].gibbler.short
+
+ def init(h=nil, u=nil, w=nil)
+ @host, @user, @wid, = h || Bluth.sysinfo.hostname, u || Bluth.sysinfo.user, w
+ @pid_file ||= "/tmp/#{self.class.prefix}-#{wid}.pid"
+ @log_file ||= "/tmp/#{self.class.prefix}-#{wid}.log"
+ @success ||= 0
+ @failure ||= 0
+ @problem ||= 0
end
- def longid
- [host, user, id].join('-')
+ def wid
+ @wid ||= [host, user, rand, Time.now.to_f].gibbler.short
+ @wid
end
# Used by daemonize as the process name (linux only)
def name
- "bs-#{self.class.prefix}-#{id}"
+ "bluth-#{self.class.prefix}-#{wid}"
end
- def key(suffix=nil)
- self.class.key longid, suffix
- end
+ #def rediskey(suffix=nil)
+ # self.class.rediskey index, suffix
+ #end
- def initialize
- @host, @user = Bluth.sysinfo.hostname, Bluth.sysinfo.user
- @pid_file ||= "/tmp/#{self.class.prefix}-#{id}.pid"
- @log_file ||= "/tmp/#{self.class.prefix}-#{id}.log"
- @success, @failure, @problem = 0, 0, 0
- end
-
def current_job
Gibbler::Digest.new(@current_job || '')
end
def kill(force=false)
if force || host == Bluth.sysinfo.hostname
- STDERR.puts "Destroying #{self.index} (this machine is: #{Bluth.sysinfo.hostname}; worker is: #{host})"
+ Familia.info "Destroying #{self.index} (this machine is: #{Bluth.sysinfo.hostname}; worker is: #{host})"
Worker.kill self.pid_file if File.exists?(self.pid_file) rescue Errno::ESRCH
File.delete self.log_file if File.exists?(self.log_file)
destroy!
else
- STDERR.puts "Worker #{self.index} not running on #{Bluth.sysinfo.hostname}"
+ Familia.info "Worker #{self.index} not running on #{Bluth.sysinfo.hostname}"
end
end
def working! gobid
@current_job = gobid
@@ -60,153 +73,147 @@
obj.extend WorkerBase::ClassMethods
end
module ClassMethods
def from_redis(wid)
- me = new
- me.id = wid
- super(me.longid)
+ me = new nil, nil, wid
+ super(me.index)
end
-
def run!(*args)
me = new
- Familia.info "Created: #{me.key}"
+ Familia.info "Created: #{me.rediskey}"
me.run!
me
end
-
def run(*args)
me = new
- Familia.info "Created: #{me.key}"
+ Familia.info "Created: #{me.rediskey}"
me.run
me
end
-
def kill(pid_file)
+ self.class.runblock :onexit
pid = read_pid_file pid_file
super(pid_file, 10)
end
-
-
+ def onstart &blk
+ @onstart = blk unless blk.nil?
+ @onstart
+ end
+ def onexit &blk
+ @onexit = blk unless blk.nil?
+ @onexit
+ end
+ # A convenience method for calling onstart/onexit blocks
+ def runblock meth
+ blk = self.send(meth)
+ return if blk.nil?
+ instance_eval &blk
+ end
end
-
end
class Worker < Storable
- include WorkerBase
- @interval = 2.seconds
+ @interval = 2 #.seconds
class << self
attr_accessor :interval
end
+ include WorkerBase
include Familia
include Logging
include Daemonizable
prefix :worker
- index :id
+ index [:host, :user, :wid]
field :host
field :user
- field :id
+ field :wid
field :process_id => Integer
field :pid_file
field :log_file
field :current_job
field :success => Integer
field :failure => Integer
field :problem => Integer
include Familia::Stamps
- def success!
- @success += 1
- @current_job = ""
- update_time
- save
+ [:success, :failure, :problem].each do |name|
+ define_method "#{name}!" do
+ v = self.send(name) + 1
+ self.send :"#{name}=", v
+ self.instance_variable_set '@current_job', ''
+ update_time! # calls save
+ end
end
- def failure!
- @failure += 1
- @current_job = ""
- update_time
- save
- end
- def problem!
- @problem += 1
- @current_job = ""
- update_time
- save
- end
def run!
begin
+ self.class.runblock :onstart
find_gob
rescue => ex
msg = "#{ex.class}: #{ex.message}"
- STDERR.puts msg
- Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug?
+ Familia.info msg
+ Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug?
+ self.class.runblock :onexit
destroy!
rescue Interrupt => ex
puts $/, "Exiting..."
+ self.class.runblock :onexit
destroy!
end
end
def run
begin
@process_id = $$
save
-
+ self.class.runblock :onstart
scheduler = Rufus::Scheduler.start_new
Familia.info "Setting interval: #{Worker.interval} sec (poptimeout: #{Bluth.poptimeout})"
Familia.reconnect_all! # Need to reconnect after daemonize
- ## TODO: Works but needs to restart scheduler
- ##Signal.trap("USR1") do
- ## Worker.interval += 1
- ## Familia.info "Setting interval: #{Worker.interval} sec"
- ##end
- ##Signal.trap("USR2") do
- ## Worker.interval -= 1
- ## Familia.info "Setting interval: #{Worker.interval}"
- ##end
scheduler.every Worker.interval, :blocking => true do |task|
Familia.ld "#{$$} TICK @ #{Time.now.utc}"
sleep rand
find_gob task
end
scheduler.join
rescue => ex
msg = "#{ex.class}: #{ex.message}"
- STDERR.puts msg
- Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug?
+ Familia.info msg
+ Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug?
+ self.class.runblock :onexit
destroy!
rescue Interrupt => ex
puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n")
Exiting...
(You may need to wait up to #{Bluth.poptimeout} seconds
for this worker to exit cleanly.)
EOS
# We reconnect to the queue in case we're currently
# waiting on a brpop (blocking pop) timeout.
+ self.class.runblock :onexit
destroy!
end
end
private
- require 'benchmark'
+
# DO NOT return from this method
def find_gob(task=nil)
begin
job = Bluth.pop
unless job.nil?
- job.wid = self.id
+ job.wid = self.wid
if job.delayed?
job.attempts = 0
job.retry!
elsif !job.attempt?
job.failure! "Too many attempts"
else
job.stime = Time.now.utc.to_i
- self.working! job.id
+ self.working! job.jobid
tms = Benchmark.measure do
job.perform
end
job.cpu = [tms.utime.fineround(3),tms.stime.fineround(3),tms.real.fineround(3)]
job.save
@@ -241,116 +248,78 @@
# EM.stop
# exit 1
#end
end
end
-
end
+ # TODO: Refactor somehow. When this is subclassed
+ # (eg BS::SchduleWorker) the self.object is not created.
class ScheduleWorker < Storable
include WorkerBase
+ include Familia
+ include Logging
+ include Daemonizable
@interval = 20
@timeout = 60 #not working
+ @every = []
class << self
- attr_accessor :interval, :timeout
+ attr_accessor :interval, :timeout, :schedule
def interval(v=nil)
@interval = v unless v.nil?
@interval
end
+ def every interval=nil, opts={}, &blk
+ unless interval.nil?
+ @every << [interval, opts, blk]
+ end
+ @every
+ end
end
- include Familia
- include Logging
- include Daemonizable
prefix :scheduler
- index :id
+ index [:host, :user, :wid]
field :host
field :user
- field :id
+ field :wid
field :process_id => Integer
field :pid_file
field :log_file
- field :scheduled => Integer
- field :monitored => Integer
- field :timeouts => Integer
include Familia::Stamps
- attr_reader :schedule
- attr_reader :monitors
-
- def scheduled!(count=1)
- @scheduled ||= 0
- @scheduled += count
- update_time
- save
- end
- def monitored!(count=1)
- @monitored ||= 0
- @monitored += count
- update_time
- save
- end
- def timeout!(count=1)
- @timeouts ||= 0
- @timeouts += count
- update_time
- save
- end
+
def run!
run
end
+
def run
begin
- raise Familia::Problem, "Only 1 scheduler at a time" if ScheduleWorker.any?
-
+ raise Familia::Problem, "Only 1 scheduler at a time" if !ScheduleWorker.instances.empty?
EM.run {
@process_id = $$
srand(Bluth.salt.to_i(16) ** @process_id)
- @schedule = Rufus::Scheduler::EmScheduler.start_new
+ ScheduleWorker.schedule = Rufus::Scheduler::EmScheduler.start_new
save # persist and make note the scheduler is running
- prepare
- @schedule.every self.class.interval, :tags => :keeper do |keeper_task|
- begin
- scheduled_work(keeper_task)
- rescue => ex
- msg = "#{ex.class}: #{ex.message}"
- STDERR.puts msg
- STDERR.puts ex.backtrace
- Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug?
- end
- sleep rand # prevent thrashing
+ self.class.runblock :onstart
+ self.class.every.each do |args|
+ interval, opts, blk = *args
+ Familia.ld " scheduling every #{interval}: #{opts}"
+ ScheduleWorker.schedule.every interval, opts, &blk
end
}
rescue => ex
msg = "#{ex.class}: #{ex.message}"
puts msg
- STDERR.puts ex.backtrace
- Familia.ld :EXCEPTION, msg, caller[1] if Familia.debug?
+ Familia.info ex.backtrace
+ Familia.trace :EXCEPTION, msg, caller[1] if Familia.debug?
+ self.class.runblock :onexit
destroy!
rescue Interrupt => ex
puts $/, "Exiting..."
+ self.class.runblock :onexit
destroy!
end
end
- protected
-
- def prepare
- end
-
- def scheduled_work(keeper)
- STDOUT.puts "Come on!"
- end
-
end
+ Bluth.scheduler = Bluth::ScheduleWorker
end
-class Rufus::Scheduler::SchedulerCore
- # See lib/rufus/sc/scheduler.rb
- def handle_exception(job, exception)
- case exception
- when SystemExit
- exit
- else
- super
- end
- end
-end