lib/cloudtasker/worker.rb in cloudtasker-0.13.2 vs lib/cloudtasker/worker.rb in cloudtasker-0.14.rc1

- old
+ new

@@ -53,19 +53,28 @@ end # Module class methods module ClassMethods # + # Return the Cloudtasker redis client + # + # @return [Cloudtasker::RedisClient] The cloudtasker redis client. + # + def redis + @redis ||= RedisClient.new + end + + # # Set the worker runtime options. # # @param [Hash] opts The worker options. # # @return [Hash] The options set. # def cloudtasker_options(opts = {}) opt_list = opts&.map { |k, v| [k.to_sym, v] } || [] # symbolize - @cloudtasker_options_hash = Hash[opt_list] + @cloudtasker_options_hash = opt_list.to_h end # # Return the worker runtime options. # @@ -74,10 +83,21 @@ def cloudtasker_options_hash @cloudtasker_options_hash || {} end # + # Return a namespaced cache key. + # + # @param [Any, Array<Any>, nil] val The key to namespace + # + # @return [String] The namespaced key(s). + # + def cache_key(val = nil) + [to_s.underscore, val].flatten.compact.map(&:to_s).join('/') + end + + # # Enqueue worker in the backgroundf. # # @param [Array<any>] *args List of worker arguments # # @return [Cloudtasker::CloudTask] The Google Task response @@ -172,17 +192,17 @@ # this duration is elapsed. # # @return [Integer] The value in seconds. # def dispatch_deadline - @dispatch_deadline ||= [ - [ - Config::MIN_DISPATCH_DEADLINE, - (self.class.cloudtasker_options_hash[:dispatch_deadline] || Cloudtasker.config.dispatch_deadline).to_i - ].max, - Config::MAX_DISPATCH_DEADLINE - ].min + @dispatch_deadline ||= begin + configured_deadline = ( + self.class.cloudtasker_options_hash[:dispatch_deadline] || + Cloudtasker.config.dispatch_deadline + ).to_i + configured_deadline.clamp(Config::MIN_DISPATCH_DEADLINE, Config::MAX_DISPATCH_DEADLINE) + end end # # Return the Cloudtasker logger instance. # @@ -202,17 +222,20 @@ # Perform job logic resp = execute_middleware_chain # Log job completion and return result - logger.info("Job done after #{job_duration}s") { { duration: job_duration } } + logger.info("Job done after #{job_duration}s") { { duration: job_duration * 1000 } } resp rescue DeadWorkerError => e - logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration } } + logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration * 1000 } } raise(e) + rescue RetryWorkerError => e + logger.info("Job done after #{job_duration}s (retry requested)") { { duration: job_duration * 1000 } } + raise(e) rescue StandardError => e - logger.info("Job failed after #{job_duration}s") { { duration: job_duration } } + logger.info("Job failed after #{job_duration}s") { { duration: job_duration * 1000 } } raise(e) end # # Return a unix timestamp specifying when to run the task. @@ -233,11 +256,11 @@ # Enqueue a worker, with or without delay. # # @param [Integer] interval The delay in seconds. # @param [Time, Integer] interval The time at which the job should run # - # @return [Cloudtasker::CloudTask] The Google Task response + # @return [Cloudtasker::CloudTask, nil] The Google Task response or nil if the job was not scheduled # def schedule(**args) # Evaluate when to schedule the job time_at = schedule_time(**args) @@ -321,11 +344,11 @@ # - Cloudtasker `max_retries` config option # # @return [Integer] The number of retries # def job_max_retries - @job_max_retries ||= (try(:max_retries, *job_args) || self.class.max_retries) + @job_max_retries ||= try(:max_retries, *job_args) || self.class.max_retries end # # Return true if the job must declared dead upon raising # an error. @@ -371,11 +394,11 @@ # @return [Float] The time taken in seconds as a floating point number. # def job_duration return 0.0 unless perform_ended_at && perform_started_at - (perform_ended_at - perform_started_at).ceil(3) + @job_duration ||= (perform_ended_at - perform_started_at).ceil(3) end # # Run worker callback. # @@ -420,10 +443,10 @@ begin # Perform the job perform(*job_args) rescue StandardError => e - run_callback(:on_error, e) + run_callback(:on_error, e) unless e.is_a?(RetryWorkerError) return raise(e) unless job_must_die? # Flag job as dead flag_as_dead(e) end