lib/cloudtasker/worker.rb in cloudtasker-0.2.0 vs lib/cloudtasker/worker.rb in cloudtasker-0.3.0
- old
+ new
@@ -4,11 +4,11 @@
# Cloud Task based workers
module Worker
# Add class method to including class
def self.included(base)
base.extend(ClassMethods)
- base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued
+ base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries
end
#
# Return a worker instance from a serialized worker.
# A worker can be serialized by calling `MyWorker#to_json`
@@ -42,44 +42,44 @@
# Check that worker class is a valid worker
worker_klass = Object.const_get(klass_name)
return nil unless worker_klass.include?(self)
# Return instantiated worker
- worker_klass.new(payload.slice(:job_args, :job_id, :job_meta))
+ worker_klass.new(payload.slice(:job_args, :job_id, :job_meta, :job_retries))
rescue NameError
nil
end
# Module class methods
module ClassMethods
#
# Set the worker runtime options.
#
- # @param [Hash] opts The worker options
+ # @param [Hash] opts The worker options.
#
- # @return [<Type>] <description>
+ # @return [Hash] The options set.
#
def cloudtasker_options(opts = {})
- opt_list = opts&.map { |k, v| [k.to_s, v] } || [] # stringify
+ opt_list = opts&.map { |k, v| [k.to_sym, v] } || [] # symbolize
@cloudtasker_options_hash = Hash[opt_list]
end
#
# Return the worker runtime options.
#
# @return [Hash] The worker runtime options.
#
def cloudtasker_options_hash
- @cloudtasker_options_hash
+ @cloudtasker_options_hash || {}
end
#
# Enqueue worker in the backgroundf.
#
# @param [Array<any>] *args List of worker arguments
#
- # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response
+ # @return [Cloudtasker::CloudTask] The Google Task response
#
def perform_async(*args)
perform_in(nil, *args)
end
@@ -87,11 +87,11 @@
# Enqueue worker and delay processing.
#
# @param [Integer, nil] interval The delay in seconds.
# @param [Array<any>] *args List of worker arguments.
#
- # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response
+ # @return [Cloudtasker::CloudTask] The Google Task response
#
def perform_in(interval, *args)
new(job_args: args).schedule(interval: interval)
end
@@ -99,52 +99,84 @@
# Enqueue worker and delay processing.
#
# @param [Time, Integer] time_at The time at which the job should run.
# @param [Array<any>] *args List of worker arguments
#
- # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response
+ # @return [Cloudtasker::CloudTask] The Google Task response
#
def perform_at(time_at, *args)
new(job_args: args).schedule(time_at: time_at)
end
+
+ #
+ # Return the numbeer of times this worker will be retried.
+ #
+ # @return [Integer] The number of retries.
+ #
+ def max_retries
+ cloudtasker_options_hash[:max_retries] || Cloudtasker.config.max_retries
+ end
end
#
# Build a new worker instance.
#
# @param [Array<any>] job_args The list of perform args.
# @param [String] job_id A unique ID identifying this job.
#
- def initialize(job_args: [], job_id: nil, job_meta: {})
+ def initialize(job_args: [], job_id: nil, job_meta: {}, job_retries: 0)
@job_args = job_args
@job_id = job_id || SecureRandom.uuid
@job_meta = MetaStore.new(job_meta)
+ @job_retries = job_retries || 0
end
#
+ # Return the Cloudtasker logger instance.
+ #
+ # @return [Logger, any] The cloudtasker logger.
+ #
+ def logger
+ @logger ||= WorkerLogger.new(self)
+ end
+
+ #
# Execute the worker by calling the `perform` with the args.
#
# @return [Any] The result of the perform.
#
def execute
- Cloudtasker.config.server_middleware.invoke(self) do
- perform(*job_args)
+ logger.info('Starting job...')
+ resp = Cloudtasker.config.server_middleware.invoke(self) do
+ begin
+ perform(*job_args)
+ rescue StandardError => e
+ try(:on_error, e)
+ return raise(e) unless job_dead?
+
+ # Flag job as dead
+ logger.info('Job dead')
+ try(:on_dead, e)
+ raise(DeadWorkerError, e)
+ end
end
+ logger.info('Job done')
+ resp
end
#
# Enqueue a worker, with or without delay.
#
# @param [Integer] interval The delay in seconds.
#
# @param [Time, Integer] interval The time at which the job should run
#
- # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response
+ # @return [Cloudtasker::CloudTask] The Google Task response
#
def schedule(interval: nil, time_at: nil)
Cloudtasker.config.client_middleware.invoke(self) do
- Task.new(self).schedule(interval: interval, time_at: time_at)
+ WorkerHandler.new(self).schedule(interval: interval, time_at: time_at)
end
end
#
# Helper method used to re-enqueue the job. Re-enqueued
@@ -153,11 +185,11 @@
# This helper may be useful when jobs must pause activity due to external
# factors such as when a third-party API is throttling the rate of API calls.
#
# @param [Integer] interval Delay to wait before processing the job again (in seconds).
#
- # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response
+ # @return [Cloudtasker::CloudTask] The Google Task response
#
def reenqueue(interval)
@job_reenqueued = true
schedule(interval: interval)
end
@@ -180,11 +212,12 @@
def to_h
{
worker: self.class.to_s,
job_id: job_id,
job_args: job_args,
- job_meta: job_meta.to_h
+ job_meta: job_meta.to_h,
+ job_retries: job_retries
}
end
#
# Return a json representation of the worker.
@@ -204,8 +237,18 @@
#
# @return [Boolean] True if the object is equal.
#
def ==(other)
other.is_a?(self.class) && other.job_id == job_id
+ end
+
+ #
+ # Return true if the job has excceeded its maximum number
+ # of retries
+ #
+ # @return [Boolean] True if the job is dead
+ #
+ def job_dead?
+ job_retries >= Cloudtasker.config.max_retries
end
end
end