# frozen_string_literal: true module Cloudtasker module Backend # Manage tasks pushed to GCP Cloud Task class GoogleCloudTask attr_accessor :gcp_task # # Create the queue configured in Cloudtasker if it does not already exist. # # @param [String] queue_name The relative name of the queue. # # @return [Google::Cloud::Tasks::V2beta3::Queue] The queue # def self.setup_queue(**opts) # Build full queue path queue_name = opts[:name] || Cloudtasker::Config::DEFAULT_JOB_QUEUE full_queue_name = queue_path(queue_name) # Try to get existing queue client.get_queue(full_queue_name) rescue Google::Gax::RetryError # Extract options concurrency = (opts[:concurrency] || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i retries = (opts[:retries] || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i # Create queue on 'not found' error client.create_queue( client.location_path(config.gcp_project_id, config.gcp_location_id), name: full_queue_name, retry_config: { max_attempts: retries }, rate_limits: { max_concurrent_dispatches: concurrency } ) end # # Return the Google Cloud Task client. # # @return [Google::Cloud::Tasks] The Google Cloud Task client. # def self.client @client ||= ::Google::Cloud::Tasks.new(version: :v2beta3) end # # Return the cloudtasker configuration. See Cloudtasker#configure. # # @return [Cloudtasker::Config] The library configuration. # def self.config Cloudtasker.config end # # Return the fully qualified path for the Cloud Task queue. # # @param [String] queue_name The relative name of the queue. # # @return [String] The queue path. # def self.queue_path(queue_name) client.queue_path( config.gcp_project_id, config.gcp_location_id, [config.gcp_queue_prefix, queue_name].join('-') ) end # # Return a protobuf timestamp specifying how to wait # before running a task. # # @param [Integer, nil] schedule_time A unix timestamp. # # @return [Google::Protobuf::Timestamp, nil] The protobuff timestamp # def self.format_schedule_time(schedule_time) return nil unless schedule_time # Generate protobuf timestamp Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i } end # # Find a task by id. # # @param [String] id The task id. # # @return [Cloudtasker::Backend::GoogleCloudTask, nil] The retrieved task. # def self.find(id) resp = client.get_task(id) resp ? new(resp) : nil rescue Google::Gax::RetryError nil end # # Create a new task. # # @param [Hash] payload The task payload. # # @return [Cloudtasker::Backend::GoogleCloudTask, nil] The created task. # def self.create(payload) # Format payload payload = payload.merge( schedule_time: format_schedule_time(payload[:schedule_time]) ).compact # Extract relative queue name relative_queue = payload.delete(:queue) # Create task resp = client.create_task(queue_path(relative_queue), payload) resp ? new(resp) : nil rescue Google::Gax::RetryError nil end # # Delete a task by id. # # @param [String] id The id of the task. # def self.delete(id) client.delete_task(id) rescue Google::Gax::RetryError nil end # # Build a new instance of the class. # # @param [Google::Cloud::Tasks::V2beta3::Task] resp The GCP Cloud Task response # def initialize(gcp_task) @gcp_task = gcp_task end # # Return the relative queue (queue name minus prefix) the task is in. # # @return [String] The relative queue name # def relative_queue gcp_task .name .match(%r{/queues/([^/]+)}) &.captures &.first &.sub("#{self.class.config.gcp_queue_prefix}-", '') end # # Return a hash description of the task. # # @return [Hash] A hash description of the task. # def to_h { id: gcp_task.name, http_request: gcp_task.to_h[:http_request], schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i, retries: gcp_task.to_h[:response_count], queue: relative_queue } end end end end