lib/stalker.rb in stalker-0.8.0 vs lib/stalker.rb in stalker-0.9.0
- old
+ new
@@ -2,182 +2,187 @@
require 'json'
require 'uri'
require 'timeout'
module Stalker
- extend self
+ extend self
- def connect(url)
- @@url = url
- beanstalk
- end
+ def connect(url)
+ @@url = url
+ beanstalk
+ end
- def enqueue(job, args={}, opts={})
- pri = opts[:pri] || 65536
- delay = opts[:delay] || 0
- ttr = opts[:ttr] || 120
- beanstalk.use job
- beanstalk.put [ job, args ].to_json, pri, delay, ttr
- rescue Beanstalk::NotConnected => e
- failed_connection(e)
- end
+ def enqueue(job, args={}, opts={})
+ pri = opts[:pri] || 65536
+ delay = [0, opts[:delay].to_i].max
+ ttr = opts[:ttr] || 120
+ beanstalk.use job
+ beanstalk.put [ job, args ].to_json, pri, delay, ttr
+ rescue Beanstalk::NotConnected => e
+ failed_connection(e)
+ end
- def job(j, &block)
- @@handlers ||= {}
- @@handlers[j] = block
- end
+ def job(j, &block)
+ @@handlers ||= {}
+ @@handlers[j] = block
+ end
- def before(&block)
- @@before_handlers ||= []
- @@before_handlers << block
- end
+ def before(&block)
+ @@before_handlers ||= []
+ @@before_handlers << block
+ end
- def error(&blk)
- @@error_handler = blk
- end
+ def error(&blk)
+ @@error_handler = blk
+ end
- class NoJobsDefined < RuntimeError; end
- class NoSuchJob < RuntimeError; end
+ class NoJobsDefined < RuntimeError; end
+ class NoSuchJob < RuntimeError; end
- def prep(jobs=nil)
- raise NoJobsDefined unless defined?(@@handlers)
- @@error_handler = nil unless defined?(@@error_handler)
+ def prep(jobs=nil)
+ raise NoJobsDefined unless defined?(@@handlers)
+ @@error_handler = nil unless defined?(@@error_handler)
- jobs ||= all_jobs
+ jobs ||= all_jobs
- jobs.each do |job|
- raise(NoSuchJob, job) unless @@handlers[job]
- end
+ jobs.each do |job|
+ raise(NoSuchJob, job) unless @@handlers[job]
+ end
- log "Working #{jobs.size} jobs: [ #{jobs.join(' ')} ]"
+ log "Working #{jobs.size} jobs: [ #{jobs.join(' ')} ]"
- jobs.each { |job| beanstalk.watch(job) }
+ jobs.each { |job| beanstalk.watch(job) }
- beanstalk.list_tubes_watched.each do |server, tubes|
- tubes.each { |tube| beanstalk.ignore(tube) unless jobs.include?(tube) }
- end
- rescue Beanstalk::NotConnected => e
- failed_connection(e)
- end
+ beanstalk.list_tubes_watched.each do |server, tubes|
+ tubes.each { |tube| beanstalk.ignore(tube) unless jobs.include?(tube) }
+ end
+ rescue Beanstalk::NotConnected => e
+ failed_connection(e)
+ end
- def work(jobs=nil)
- prep(jobs)
- loop { work_one_job }
- end
+ def work(jobs=nil)
+ prep(jobs)
+ loop { work_one_job }
+ end
- class JobTimeout < RuntimeError; end
+ class JobTimeout < RuntimeError; end
- def work_one_job
- job = beanstalk.reserve
- name, args = JSON.parse job.body
- log_job_begin(name, args)
- handler = @@handlers[name]
- raise(NoSuchJob, name) unless handler
+ def work_one_job
+ job = beanstalk.reserve
+ name, args = JSON.parse job.body
+ log_job_begin(name, args)
+ handler = @@handlers[name]
+ raise(NoSuchJob, name) unless handler
- begin
- Timeout::timeout(job.ttr - 1) do
- if defined? @@before_handlers and @@before_handlers.respond_to? :each
- @@before_handlers.each do |block|
- block.call(name)
- end
- end
- handler.call(args)
- end
- rescue Timeout::Error
- raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout"
- end
+ begin
+ Timeout::timeout(job.ttr - 1) do
+ if defined? @@before_handlers and @@before_handlers.respond_to? :each
+ @@before_handlers.each do |block|
+ block.call(name)
+ end
+ end
+ handler.call(args)
+ end
+ rescue Timeout::Error
+ raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout"
+ end
- job.delete
- log_job_end(name)
- rescue Beanstalk::NotConnected => e
- failed_connection(e)
- rescue SystemExit
- raise
- rescue => e
- log_error exception_message(e)
- job.bury rescue nil
- log_job_end(name, 'failed')
+ job.delete
+ log_job_end(name)
+ rescue Beanstalk::NotConnected => e
+ failed_connection(e)
+ rescue SystemExit
+ raise
+ rescue => e
+ log_error exception_message(e)
+ job.bury rescue nil
+ log_job_end(name, 'failed') if @job_begun
if error_handler
if error_handler.arity == 1
error_handler.call(e)
else
error_handler.call(e, name, args)
end
end
- end
+ end
- def failed_connection(e)
- log_error exception_message(e)
- log_error "*** Failed connection to #{beanstalk_url}"
- log_error "*** Check that beanstalkd is running (or set a different BEANSTALK_URL)"
- exit 1
- end
+ def failed_connection(e)
+ log_error exception_message(e)
+ log_error "*** Failed connection to #{beanstalk_url}"
+ log_error "*** Check that beanstalkd is running (or set a different BEANSTALK_URL)"
+ exit 1
+ end
- def log_job_begin(name, args)
- args_flat = unless args.empty?
- '(' + args.inject([]) do |accum, (key,value)|
- accum << "#{key}=#{value}"
- end.join(' ') + ')'
- else
- ''
- end
+ def log_job_begin(name, args)
+ args_flat = unless args.empty?
+ '(' + args.inject([]) do |accum, (key,value)|
+ accum << "#{key}=#{value}"
+ end.join(' ') + ')'
+ else
+ ''
+ end
- log [ "Working", name, args_flat ].join(' ')
- @job_begun = Time.now
- end
+ log [ "Working", name, args_flat ].join(' ')
+ @job_begun = Time.now
+ end
- def log_job_end(name, failed=false)
- ellapsed = Time.now - @job_begun
- ms = (ellapsed.to_f * 1000).to_i
- log "Finished #{name} in #{ms}ms #{failed ? ' (failed)' : ''}"
- end
+ def log_job_end(name, failed=false)
+ ellapsed = Time.now - @job_begun
+ ms = (ellapsed.to_f * 1000).to_i
+ log "Finished #{name} in #{ms}ms #{failed ? ' (failed)' : ''}"
+ end
- def log(msg)
- puts msg
- end
+ def log(msg)
+ puts msg
+ end
- def log_error(msg)
- STDERR.puts msg
- end
+ def log_error(msg)
+ STDERR.puts msg
+ end
- def beanstalk
- @@beanstalk ||= Beanstalk::Pool.new([ beanstalk_host_and_port ])
- end
+ def beanstalk
+ @@beanstalk ||= Beanstalk::Pool.new(beanstalk_addresses)
+ end
- def beanstalk_url
- return @@url if defined?(@@url) and @@url
- ENV['BEANSTALK_URL'] || 'beanstalk://localhost/'
- end
+ def beanstalk_url
+ return @@url if defined?(@@url) and @@url
+ ENV['BEANSTALK_URL'] || 'beanstalk://localhost/'
+ end
- class BadURL < RuntimeError; end
+ class BadURL < RuntimeError; end
- def beanstalk_host_and_port
- uri = URI.parse(beanstalk_url)
- raise(BadURL, beanstalk_url) if uri.scheme != 'beanstalk'
- return "#{uri.host}:#{uri.port || 11300}"
- end
+ def beanstalk_addresses
+ uris = beanstalk_url.split(/[\s,]+/)
+ uris.map {|uri| beanstalk_host_and_port(uri)}
+ end
- def exception_message(e)
- msg = [ "Exception #{e.class} -> #{e.message}" ]
+ def beanstalk_host_and_port(uri_string)
+ uri = URI.parse(uri_string)
+ raise(BadURL, uri_string) if uri.scheme != 'beanstalk'
+ "#{uri.host}:#{uri.port || 11300}"
+ end
- base = File.expand_path(Dir.pwd) + '/'
- e.backtrace.each do |t|
- msg << " #{File.expand_path(t).gsub(/#{base}/, '')}"
- end
+ def exception_message(e)
+ msg = [ "Exception #{e.class} -> #{e.message}" ]
- msg.join("\n")
- end
+ base = File.expand_path(Dir.pwd) + '/'
+ e.backtrace.each do |t|
+ msg << " #{File.expand_path(t).gsub(/#{base}/, '')}"
+ end
- def all_jobs
- @@handlers.keys
- end
+ msg.join("\n")
+ end
- def error_handler
- @@error_handler
- end
+ def all_jobs
+ @@handlers.keys
+ end
- def clear!
- @@handlers = nil
- @@before_handlers = nil
- @@error_handler = nil
- end
+ def error_handler
+ @@error_handler
+ end
+
+ def clear!
+ @@handlers = nil
+ @@before_handlers = nil
+ @@error_handler = nil
+ end
end