Sha256: c33754a480ac0db3f640649e1f507f6bcc6977afbfbad2c716eeb5ff04f92190

Contents?: true

Size: 999 Bytes

Versions: 1

Compression:

Stored size: 999 Bytes

Contents

module ActiveJob
  module QueueAdapters
    class LambdakiqAdapter

      def enqueue(job, options = {})
        job.lambdakiq_async? ? _enqueue_async(job, options) : _enqueue(job, options)
      end

      def enqueue_at(job, timestamp)
        enqueue job, delay_seconds: delay_seconds(timestamp)
      end

      def enqueue_after_transaction_commit?
        true
      end

      private

      def delay_seconds(timestamp)
        ds = (timestamp - Time.current.to_i).to_i
        [ds, 900].min
      end

      def _enqueue(job, options = {})
        queue = Lambdakiq.client.queues[job.queue_name]
        queue.send_message job, options
      end

      def _enqueue_async(job, options = {})
        Concurrent::Promise
          .execute { _enqueue(job, options) }
          .on_error { |e| async_enqueue_error(e) }
      end

      def async_enqueue_error(e)
        msg = "[Lambdakiq] Failed to queue job #{job}. Reason: #{e}"
        Rails.logger.error(msg)
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
lambdakiq-2.3.0 lib/lambdakiq/adapter.rb