lib/backburner/worker.rb in backburner-0.4.2 vs lib/backburner/worker.rb in backburner-0.4.3

- old
+ new

@@ -25,54 +25,56 @@ # def self.enqueue(job_class, args=[], opts={}) pri = resolve_priority(opts[:pri] || job_class) delay = [0, opts[:delay].to_i].max ttr = opts[:ttr] || Backburner.configuration.respond_timeout - tube = connection.tubes[expand_tube_name(opts[:queue] || job_class)] res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args) return false unless res # stop if hook is false data = { :class => job_class.name, :args => args } - tube.put data.to_json, :pri => pri, :delay => delay, :ttr => ttr + retryable_command do + tube = connection.tubes[expand_tube_name(opts[:queue] || job_class)] + tube.put(data.to_json, :pri => pri, :delay => delay, :ttr => ttr) + end Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args) return true - rescue Beaneater::NotConnected => e - retry_connection! end # Starts processing jobs with the specified tube_names. # # @example # Backburner::Worker.start(["foo.tube.name"]) # def self.start(tube_names=nil) - self.new(tube_names).start + begin + self.new(tube_names).start + rescue SystemExit + # do nothing + end end # Returns the worker connection. # @example # Backburner::Worker.connection # => <Beaneater::Pool> def self.connection @connection ||= Connection.new(Backburner.configuration.beanstalk_url) end - # Retries to make a connection to beanstalkd if that connection failed. + # Retries the given command specified in the block several times if there is a connection error + # Used to execute beanstalkd commands in a retryable way + # + # @example + # retryable_command { ... } # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times. - def self.retry_connection!(max_tries=5) - retry_count = 0 + # + def self.retryable_command(max_tries=8, &block) begin - @connection = nil - self.connection + yield rescue Beaneater::NotConnected => e - if retry_count < max_tries - retry_count += 1 - sleep 0.5 - retry - else # stop retrying - raise e - end + retry_connection!(max_tries) + yield end - end # retry_connection! + end # List of tube names to be watched and processed attr_accessor :tube_names # Constructs a new worker for processing jobs within specified tubes. @@ -109,11 +111,11 @@ end # Triggers this worker to shutdown def shutdown log_info 'Worker exiting...' - Kernel.exit! + Kernel.exit end # Processes tube_names given tube_names array. # Should return normalized tube_names as an array of strings. # @@ -153,9 +155,44 @@ job.bury self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at end handle_error(e, job.name, job.args) end + + # Retries the given command specified in the block several times if there is a connection error + # Used to execute beanstalkd commands in a retryable way + # + # @example + # retryable_command { ... } + # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times. + # + def self.retryable_command(max_tries=8, &block) + begin + yield + rescue Beaneater::NotConnected => e + retry_connection!(max_tries) + yield + end + end + + # Retries to make a connection to beanstalkd if that connection failed. + # @raise [Beaneater::NotConnected] If beanstalk fails to connect multiple times. + # + def self.retry_connection!(max_tries=8) + retry_count = 0 + begin + @connection = nil + self.connection.stats + rescue Beaneater::NotConnected => e + if retry_count < max_tries + retry_count += 1 + sleep 1 + retry + else # stop retrying + raise e + end + end + end # retry_connection! protected # Returns a list of all tubes known within the system # Filtered for tubes that match the known prefix \ No newline at end of file