lib/cloudtasker/worker.rb in cloudtasker-0.13.2 vs lib/cloudtasker/worker.rb in cloudtasker-0.14.rc1
- old
+ new
@@ -53,19 +53,28 @@
end
# Module class methods
module ClassMethods
#
+ # Return the Cloudtasker redis client
+ #
+ # @return [Cloudtasker::RedisClient] The cloudtasker redis client.
+ #
+ def redis
+ @redis ||= RedisClient.new
+ end
+
+ #
# Set the worker runtime options.
#
# @param [Hash] opts The worker options.
#
# @return [Hash] The options set.
#
def cloudtasker_options(opts = {})
opt_list = opts&.map { |k, v| [k.to_sym, v] } || [] # symbolize
- @cloudtasker_options_hash = Hash[opt_list]
+ @cloudtasker_options_hash = opt_list.to_h
end
#
# Return the worker runtime options.
#
@@ -74,10 +83,21 @@
def cloudtasker_options_hash
@cloudtasker_options_hash || {}
end
#
+ # Return a namespaced cache key.
+ #
+ # @param [Any, Array<Any>, nil] val The key to namespace
+ #
+ # @return [String] The namespaced key(s).
+ #
+ def cache_key(val = nil)
+ [to_s.underscore, val].flatten.compact.map(&:to_s).join('/')
+ end
+
+ #
# Enqueue worker in the backgroundf.
#
# @param [Array<any>] *args List of worker arguments
#
# @return [Cloudtasker::CloudTask] The Google Task response
@@ -172,17 +192,17 @@
# this duration is elapsed.
#
# @return [Integer] The value in seconds.
#
def dispatch_deadline
- @dispatch_deadline ||= [
- [
- Config::MIN_DISPATCH_DEADLINE,
- (self.class.cloudtasker_options_hash[:dispatch_deadline] || Cloudtasker.config.dispatch_deadline).to_i
- ].max,
- Config::MAX_DISPATCH_DEADLINE
- ].min
+ @dispatch_deadline ||= begin
+ configured_deadline = (
+ self.class.cloudtasker_options_hash[:dispatch_deadline] ||
+ Cloudtasker.config.dispatch_deadline
+ ).to_i
+ configured_deadline.clamp(Config::MIN_DISPATCH_DEADLINE, Config::MAX_DISPATCH_DEADLINE)
+ end
end
#
# Return the Cloudtasker logger instance.
#
@@ -202,17 +222,20 @@
# Perform job logic
resp = execute_middleware_chain
# Log job completion and return result
- logger.info("Job done after #{job_duration}s") { { duration: job_duration } }
+ logger.info("Job done after #{job_duration}s") { { duration: job_duration * 1000 } }
resp
rescue DeadWorkerError => e
- logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration } }
+ logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration * 1000 } }
raise(e)
+ rescue RetryWorkerError => e
+ logger.info("Job done after #{job_duration}s (retry requested)") { { duration: job_duration * 1000 } }
+ raise(e)
rescue StandardError => e
- logger.info("Job failed after #{job_duration}s") { { duration: job_duration } }
+ logger.info("Job failed after #{job_duration}s") { { duration: job_duration * 1000 } }
raise(e)
end
#
# Return a unix timestamp specifying when to run the task.
@@ -233,11 +256,11 @@
# Enqueue a worker, with or without delay.
#
# @param [Integer] interval The delay in seconds.
# @param [Time, Integer] interval The time at which the job should run
#
- # @return [Cloudtasker::CloudTask] The Google Task response
+ # @return [Cloudtasker::CloudTask, nil] The Google Task response or nil if the job was not scheduled
#
def schedule(**args)
# Evaluate when to schedule the job
time_at = schedule_time(**args)
@@ -321,11 +344,11 @@
# - Cloudtasker `max_retries` config option
#
# @return [Integer] The number of retries
#
def job_max_retries
- @job_max_retries ||= (try(:max_retries, *job_args) || self.class.max_retries)
+ @job_max_retries ||= try(:max_retries, *job_args) || self.class.max_retries
end
#
# Return true if the job must declared dead upon raising
# an error.
@@ -371,11 +394,11 @@
# @return [Float] The time taken in seconds as a floating point number.
#
def job_duration
return 0.0 unless perform_ended_at && perform_started_at
- (perform_ended_at - perform_started_at).ceil(3)
+ @job_duration ||= (perform_ended_at - perform_started_at).ceil(3)
end
#
# Run worker callback.
#
@@ -420,10 +443,10 @@
begin
# Perform the job
perform(*job_args)
rescue StandardError => e
- run_callback(:on_error, e)
+ run_callback(:on_error, e) unless e.is_a?(RetryWorkerError)
return raise(e) unless job_must_die?
# Flag job as dead
flag_as_dead(e)
end