Sha256: 6eca509bc0a7a10dd1868d6618cd6c48f8389f747ab3e04a45ba499e7fda1675

Contents?: true

Size: 931 Bytes

Versions: 9

Compression:

Stored size: 931 Bytes

Contents

module ActiveJob
  module QueueAdapters
    class LambdakiqAdapter

      def enqueue(job, options = {})
        job.lambdakiq_async? ? _enqueue_async(job, options) : _enqueue(job, options)
      end

      def enqueue_at(job, timestamp)
        enqueue job, delay_seconds: delay_seconds(timestamp)
      end

      private

      def delay_seconds(timestamp)
        ds = (timestamp - Time.current.to_i).to_i
        [ds, 900].min
      end

      def _enqueue(job, options = {})
        queue = Lambdakiq.client.queues[job.queue_name]
        queue.send_message job, options
      end

      def _enqueue_async(job, options = {})
        Concurrent::Promise
          .execute { _enqueue(job, options) }
          .on_error { |e| async_enqueue_error(e) }
      end

      def async_enqueue_error(e)
        msg = "[Lambdakiq] Failed to queue job #{job}. Reason: #{e}"
        Rails.logger.error(msg)
      end

    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
lambdakiq-2.2.0 lib/lambdakiq/adapter.rb
lambdakiq-2.1.0 lib/lambdakiq/adapter.rb
lambdakiq-2.0.2 lib/lambdakiq/adapter.rb
lambdakiq-2.0.1 lib/lambdakiq/adapter.rb
lambdakiq-2.0.0 lib/lambdakiq/adapter.rb
lambdakiq-1.0.4 lib/lambdakiq/adapter.rb
lambdakiq-1.0.3 lib/lambdakiq/adapter.rb
lambdakiq-1.0.2 lib/lambdakiq/adapter.rb
lambdakiq-1.0.1 lib/lambdakiq/adapter.rb