Sha256: c2e52f6e77c5f9250f8260d171a8a96b776a1de6cb5dff7a6560b82e302c7554
Contents?: true
Size: 1.61 KB
Versions: 1
Compression:
Stored size: 1.61 KB
Contents
# frozen_string_literal: true module AsyncActiveJob class Adapter # @param active_job [ActiveJob::Base] the job to be enqueued from +#perform_later+ # @return [AsyncActiveJob::Job] def enqueue(active_job) enqueue_at(active_job, nil) end # @param active_job [ActiveJob::Base] the job to be enqueued from +#perform_later+ # @param timestamp [Integer, nil] the epoch time to perform the job # @return [AsyncActiveJob::Job] def enqueue_at(active_job, timestamp) scheduled_at = timestamp ? Time.zone.at(timestamp) : nil opts = { active_job: active_job, scheduled_at: scheduled_at } ActiveSupport::Notifications.instrument('enqueue_job.async_active_job', opts) do |instrument_payload| async_active_job = AsyncActiveJob::Job.enqueue( JobWrapper.new(active_job.serialize), queue_name: active_job.queue_name || AsyncActiveJob.configuration.default_queue_name, priority: active_job.priority || AsyncActiveJob.configuration.default_priority, run_at: scheduled_at || Time.zone.now ) instrument_payload[:async_active_job] = async_active_job active_job.provider_job_id = async_active_job.id async_active_job end end class JobWrapper # :nodoc: attr_accessor :job_data def initialize(job_data) @job_data = job_data end def active_job_class job_data['job_class'].constantize end def perform ActiveJob::Base.execute(job_data) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
async_active_job-0.1.0 | lib/async_active_job/adapter.rb |