Sha256: 4e6688758b8008ef0a75ee7d699e2b3f9747a829fdc473dbedc887b9e6e89be0

Contents?: true

Size: 1.41 KB

Versions: 4

Compression:

Stored size: 1.41 KB

Contents

module ActiveJob::QueueAdapters::SolidQueueExt::RecurringTasks
  def supports_recurring_tasks?
    true
  end

  def recurring_tasks
    tasks = recurring_tasks_from_dispatchers
    last_enqueued_at_times = recurring_task_last_enqueued_at(tasks.keys)

    recurring_tasks_from_dispatchers.collect do |task_id, task_attrs|
      recurring_task_attributes_from_solid_queue_task_attributes(task_attrs).merge \
        id: task_id,
        last_enqueued_at: last_enqueued_at_times[task_id]
    end
  end

  def find_recurring_task(task_id)
    if task_attrs = recurring_tasks_from_dispatchers[task_id]
      recurring_task_attributes_from_solid_queue_task_attributes(task_attrs).merge \
        id: task_id,
        last_enqueued_at: recurring_task_last_enqueued_at(task_id).values&.first
    end
  end

  private
    def recurring_tasks_from_dispatchers
      SolidQueue::Process.where(kind: "Dispatcher").flat_map do |process|
        process.metadata["recurring_schedule"]
      end.compact.reduce({}, &:merge)
    end

    def recurring_task_attributes_from_solid_queue_task_attributes(task_attributes)
      {
        job_class_name: task_attributes["class_name"],
        arguments: task_attributes["arguments"],
        schedule: task_attributes["schedule"]
      }
    end

    def recurring_task_last_enqueued_at(task_keys)
      SolidQueue::RecurringExecution.where(task_key: task_keys).group(:task_key).maximum(:run_at)
    end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
mission_control-jobs-0.3.0 lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb
mission_control-jobs-0.2.2 lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb
mission_control-jobs-0.2.1 lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb
mission_control-jobs-0.2.0 lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb