# frozen_string_literal: true module Cloudtasker # Cloud Task based workers module Worker # Add class method to including class def self.included(base) base.extend(ClassMethods) base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued end # # Return a worker instance from a serialized worker. # A worker can be serialized by calling `MyWorker#to_json` # # @param [String] json Worker serialized as json. # # @return [Cloudtasker::Worker, nil] The instantiated worker. # def self.from_json(json) from_hash(JSON.parse(json)) rescue JSON::ParserError nil end # # Return a worker instance from a worker hash description. # A worker hash description is typically generated by calling `MyWorker#to_h` # # @param [Hash] hash A worker hash description. # # @return [Cloudtasker::Worker, nil] The instantiated worker. # def self.from_hash(hash) # Symbolize payload keys payload = JSON.parse(hash.to_json, symbolize_names: true) # Extract worker parameters klass_name = payload&.dig(:worker) return nil unless klass_name # Check that worker class is a valid worker worker_klass = Object.const_get(klass_name) return nil unless worker_klass.include?(self) # Return instantiated worker worker_klass.new(payload.slice(:job_args, :job_id, :job_meta)) rescue NameError nil end # Module class methods module ClassMethods # # Set the worker runtime options. # # @param [Hash] opts The worker options # # @return [] # def cloudtasker_options(opts = {}) opt_list = opts&.map { |k, v| [k.to_s, v] } || [] # stringify @cloudtasker_options_hash = Hash[opt_list] end # # Return the worker runtime options. # # @return [Hash] The worker runtime options. # def cloudtasker_options_hash @cloudtasker_options_hash end # # Enqueue worker in the backgroundf. # # @param [Array] *args List of worker arguments # # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response # def perform_async(*args) perform_in(nil, *args) end # # Enqueue worker and delay processing. # # @param [Integer, nil] interval The delay in seconds. # @param [Array] *args List of worker arguments. # # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response # def perform_in(interval, *args) new(job_args: args).schedule(interval: interval) end # # Enqueue worker and delay processing. # # @param [Time, Integer] time_at The time at which the job should run. # @param [Array] *args List of worker arguments # # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response # def perform_at(time_at, *args) new(job_args: args).schedule(time_at: time_at) end end # # Build a new worker instance. # # @param [Array] job_args The list of perform args. # @param [String] job_id A unique ID identifying this job. # def initialize(job_args: [], job_id: nil, job_meta: {}) @job_args = job_args @job_id = job_id || SecureRandom.uuid @job_meta = MetaStore.new(job_meta) end # # Execute the worker by calling the `perform` with the args. # # @return [Any] The result of the perform. # def execute Cloudtasker.config.server_middleware.invoke(self) do perform(*job_args) end end # # Enqueue a worker, with or without delay. # # @param [Integer] interval The delay in seconds. # # @param [Time, Integer] interval The time at which the job should run # # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response # def schedule(interval: nil, time_at: nil) Cloudtasker.config.client_middleware.invoke(self) do Task.new(self).schedule(interval: interval, time_at: time_at) end end # # Helper method used to re-enqueue the job. Re-enqueued # jobs keep the same job_id. # # This helper may be useful when jobs must pause activity due to external # factors such as when a third-party API is throttling the rate of API calls. # # @param [Integer] interval Delay to wait before processing the job again (in seconds). # # @return [Google::Cloud::Tasks::V2beta3::Task] The Google Task response # def reenqueue(interval) @job_reenqueued = true schedule(interval: interval) end # # Return a new instance of the worker with the same args and metadata # but with a different id. # # @return [] # def new_instance self.class.new(job_args: job_args, job_meta: job_meta) end # # Return a hash description of the worker. # # @return [Hash] The worker hash description. # def to_h { worker: self.class.to_s, job_id: job_id, job_args: job_args, job_meta: job_meta.to_h } end # # Return a json representation of the worker. # # @param [Array] *args Arguments passed to to_json. # # @return [String] The worker json representation. # def to_json(*args) to_h.to_json(*args) end # # Equality operator. # # @param [Any] other The object to compare. # # @return [Boolean] True if the object is equal. # def ==(other) other.is_a?(self.class) && other.job_id == job_id end end end