lib/backburner/worker.rb in backburner-0.4.2 vs lib/backburner/worker.rb in backburner-0.4.3
- old
+ new
@@ -25,54 +25,56 @@
#
def self.enqueue(job_class, args=[], opts={})
pri = resolve_priority(opts[:pri] || job_class)
delay = [0, opts[:delay].to_i].max
ttr = opts[:ttr] || Backburner.configuration.respond_timeout
- tube = connection.tubes[expand_tube_name(opts[:queue] || job_class)]
res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)
return false unless res # stop if hook is false
data = { :class => job_class.name, :args => args }
- tube.put data.to_json, :pri => pri, :delay => delay, :ttr => ttr
+ retryable_command do
+ tube = connection.tubes[expand_tube_name(opts[:queue] || job_class)]
+ tube.put(data.to_json, :pri => pri, :delay => delay, :ttr => ttr)
+ end
Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
return true
- rescue Beaneater::NotConnected => e
- retry_connection!
end
# Starts processing jobs with the specified tube_names.
#
# @example
# Backburner::Worker.start(["foo.tube.name"])
#
def self.start(tube_names=nil)
- self.new(tube_names).start
+ begin
+ self.new(tube_names).start
+ rescue SystemExit
+ # do nothing
+ end
end
# Returns the worker connection.
# @example
# Backburner::Worker.connection # => <Beaneater::Pool>
def self.connection
@connection ||= Connection.new(Backburner.configuration.beanstalk_url)
end
- # Retries to make a connection to beanstalkd if that connection failed.
+ # Retries the given command specified in the block several times if there is a connection error
+ # Used to execute beanstalkd commands in a retryable way
+ #
+ # @example
+ # retryable_command { ... }
# @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.
- def self.retry_connection!(max_tries=5)
- retry_count = 0
+ #
+ def self.retryable_command(max_tries=8, &block)
begin
- @connection = nil
- self.connection
+ yield
rescue Beaneater::NotConnected => e
- if retry_count < max_tries
- retry_count += 1
- sleep 0.5
- retry
- else # stop retrying
- raise e
- end
+ retry_connection!(max_tries)
+ yield
end
- end # retry_connection!
+ end
# List of tube names to be watched and processed
attr_accessor :tube_names
# Constructs a new worker for processing jobs within specified tubes.
@@ -109,11 +111,11 @@
end
# Triggers this worker to shutdown
def shutdown
log_info 'Worker exiting...'
- Kernel.exit!
+ Kernel.exit
end
# Processes tube_names given tube_names array.
# Should return normalized tube_names as an array of strings.
#
@@ -153,9 +155,44 @@
job.bury
self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at
end
handle_error(e, job.name, job.args)
end
+
+ # Retries the given command specified in the block several times if there is a connection error
+ # Used to execute beanstalkd commands in a retryable way
+ #
+ # @example
+ # retryable_command { ... }
+ # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.
+ #
+ def self.retryable_command(max_tries=8, &block)
+ begin
+ yield
+ rescue Beaneater::NotConnected => e
+ retry_connection!(max_tries)
+ yield
+ end
+ end
+
+ # Retries to make a connection to beanstalkd if that connection failed.
+ # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times.
+ #
+ def self.retry_connection!(max_tries=8)
+ retry_count = 0
+ begin
+ @connection = nil
+ self.connection.stats
+ rescue Beaneater::NotConnected => e
+ if retry_count < max_tries
+ retry_count += 1
+ sleep 1
+ retry
+ else # stop retrying
+ raise e
+ end
+ end
+ end # retry_connection!
protected
# Returns a list of all tubes known within the system
# Filtered for tubes that match the known prefix
\ No newline at end of file