# frozen_string_literal: true require 'concurrent/delay' module GoodJob # # JobPerformer queries the database for jobs and performs them on behalf of a # {Scheduler}. It mainly functions as glue between a {Scheduler} and the jobs # it should be executing. # # The JobPerformer must be safe to execute across multiple threads. # class JobPerformer cattr_accessor :performing_active_job_ids, default: Concurrent::Set.new # @param queue_string [String] Queues to execute jobs from def initialize(queue_string, capsule: GoodJob.capsule) @queue_string = queue_string @capsule = capsule @metrics = Metrics.new end # A meaningful name to identify the performer in logs and for debugging. # @return [String] The queues from which Jobs are worked def name @queue_string end # Perform the next eligible job # @yield [Execution] Yields the execution, if one is dequeued # @return [Object, nil] Returns job result or +nil+ if no job was found def next active_job_id = nil @capsule.tracker.register do job_query.perform_with_advisory_lock(lock_id: @capsule.tracker.id_for_lock, parsed_queues: parsed_queues, queue_select_limit: GoodJob.configuration.queue_select_limit) do |execution| @metrics.touch_check_queue_at if execution active_job_id = execution.active_job_id performing_active_job_ids << active_job_id @metrics.touch_execution_at yield(execution) if block_given? else @metrics.increment_empty_executions end end.tap do |result| if result result.succeeded? ? @metrics.increment_succeeded_executions : @metrics.increment_errored_executions end end end ensure performing_active_job_ids.delete(active_job_id) if active_job_id end # Tests whether this performer should be used in GoodJob's current state. # # For example, state will be a LISTEN/NOTIFY message that is passed down # from the Notifier to the Scheduler. The Scheduler is able to ask # its performer "does this message relate to you?", and if not, ignore it # to minimize thread wake-ups, database queries, and thundering herds. # # @return [Boolean] whether the performer's {#next} method should be # called in the current state. def next?(state = {}) return true unless state[:queue_name] if parsed_queues[:exclude] parsed_queues[:exclude].exclude?(state[:queue_name]) elsif parsed_queues[:include] parsed_queues[:include].include?(state[:queue_name]) else true end end # The Returns timestamps of when next tasks may be available. # @param after [DateTime, Time, nil] future jobs scheduled after this time # @param limit [Integer] number of future timestamps to return # @param now_limit [Integer] number of past timestamps to return # @return [Array, nil] def next_at(after: nil, limit: nil, now_limit: nil) job_query.next_scheduled_at(after: after, limit: limit, now_limit: now_limit) end # Destroy expired preserved jobs # @return [void] def cleanup GoodJob.cleanup_preserved_jobs end # Metrics about this performer # @return [Hash] def stats { name: name, }.merge(@metrics.to_h) end # Reset metrics about this performer # @return [void] def reset_stats @metrics.reset end private attr_reader :queue_string def job_query @_job_query ||= GoodJob::Job.queue_string(queue_string) end def parsed_queues @_parsed_queues ||= GoodJob::Job.queue_parser(queue_string) end end end