lib/cloudtasker/worker.rb in cloudtasker-0.2.0 vs lib/cloudtasker/worker.rb in cloudtasker-0.3.0

- old
+ new

@@ -4,11 +4,11 @@ # Cloud Task based workers module Worker # Add class method to including class def self.included(base) base.extend(ClassMethods) - base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued + base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries end # # Return a worker instance from a serialized worker. # A worker can be serialized by calling `MyWorker#to_json` @@ -42,44 +42,44 @@ # Check that worker class is a valid worker worker_klass = Object.const_get(klass_name) return nil unless worker_klass.include?(self) # Return instantiated worker - worker_klass.new(payload.slice(:job_args, :job_id, :job_meta)) + worker_klass.new(payload.slice(:job_args, :job_id, :job_meta, :job_retries)) rescue NameError nil end # Module class methods module ClassMethods # # Set the worker runtime options. # - # @param [Hash] opts The worker options + # @param [Hash] opts The worker options. # - # @return [<Type>] <description> + # @return [Hash] The options set. # def cloudtasker_options(opts = {}) - opt_list = opts&.map { |k, v| [k.to_s, v] } || [] # stringify + opt_list = opts&.map { |k, v| [k.to_sym, v] } || [] # symbolize @cloudtasker_options_hash = Hash[opt_list] end # # Return the worker runtime options. # # @return [Hash] The worker runtime options. # def cloudtasker_options_hash - @cloudtasker_options_hash + @cloudtasker_options_hash || {} end # # Enqueue worker in the backgroundf. # # @param [Array<any>] *args List of worker arguments # - # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response + # @return [Cloudtasker::CloudTask] The Google Task response # def perform_async(*args) perform_in(nil, *args) end @@ -87,11 +87,11 @@ # Enqueue worker and delay processing. # # @param [Integer, nil] interval The delay in seconds. # @param [Array<any>] *args List of worker arguments. # - # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response + # @return [Cloudtasker::CloudTask] The Google Task response # def perform_in(interval, *args) new(job_args: args).schedule(interval: interval) end @@ -99,52 +99,84 @@ # Enqueue worker and delay processing. # # @param [Time, Integer] time_at The time at which the job should run. # @param [Array<any>] *args List of worker arguments # - # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response + # @return [Cloudtasker::CloudTask] The Google Task response # def perform_at(time_at, *args) new(job_args: args).schedule(time_at: time_at) end + + # + # Return the numbeer of times this worker will be retried. + # + # @return [Integer] The number of retries. + # + def max_retries + cloudtasker_options_hash[:max_retries] || Cloudtasker.config.max_retries + end end # # Build a new worker instance. # # @param [Array<any>] job_args The list of perform args. # @param [String] job_id A unique ID identifying this job. # - def initialize(job_args: [], job_id: nil, job_meta: {}) + def initialize(job_args: [], job_id: nil, job_meta: {}, job_retries: 0) @job_args = job_args @job_id = job_id || SecureRandom.uuid @job_meta = MetaStore.new(job_meta) + @job_retries = job_retries || 0 end # + # Return the Cloudtasker logger instance. + # + # @return [Logger, any] The cloudtasker logger. + # + def logger + @logger ||= WorkerLogger.new(self) + end + + # # Execute the worker by calling the `perform` with the args. # # @return [Any] The result of the perform. # def execute - Cloudtasker.config.server_middleware.invoke(self) do - perform(*job_args) + logger.info('Starting job...') + resp = Cloudtasker.config.server_middleware.invoke(self) do + begin + perform(*job_args) + rescue StandardError => e + try(:on_error, e) + return raise(e) unless job_dead? + + # Flag job as dead + logger.info('Job dead') + try(:on_dead, e) + raise(DeadWorkerError, e) + end end + logger.info('Job done') + resp end # # Enqueue a worker, with or without delay. # # @param [Integer] interval The delay in seconds. # # @param [Time, Integer] interval The time at which the job should run # - # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response + # @return [Cloudtasker::CloudTask] The Google Task response # def schedule(interval: nil, time_at: nil) Cloudtasker.config.client_middleware.invoke(self) do - Task.new(self).schedule(interval: interval, time_at: time_at) + WorkerHandler.new(self).schedule(interval: interval, time_at: time_at) end end # # Helper method used to re-enqueue the job. Re-enqueued @@ -153,11 +185,11 @@ # This helper may be useful when jobs must pause activity due to external # factors such as when a third-party API is throttling the rate of API calls. # # @param [Integer] interval Delay to wait before processing the job again (in seconds). # - # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response + # @return [Cloudtasker::CloudTask] The Google Task response # def reenqueue(interval) @job_reenqueued = true schedule(interval: interval) end @@ -180,11 +212,12 @@ def to_h { worker: self.class.to_s, job_id: job_id, job_args: job_args, - job_meta: job_meta.to_h + job_meta: job_meta.to_h, + job_retries: job_retries } end # # Return a json representation of the worker. @@ -204,8 +237,18 @@ # # @return [Boolean] True if the object is equal. # def ==(other) other.is_a?(self.class) && other.job_id == job_id + end + + # + # Return true if the job has excceeded its maximum number + # of retries + # + # @return [Boolean] True if the job is dead + # + def job_dead? + job_retries >= Cloudtasker.config.max_retries end end end