Sha256: 8b6df8e9ea4650ec0ebed5bde3aff255190faa3dc8c076c61bbc2c714d342f66

Contents?: true

Size: 1.18 KB

Versions: 1

Compression:

Stored size: 1.18 KB

Contents

# frozen_string_literal: true

require 'json'

module Quiq
  class Job
    def initialize(raw, queue)
      @raw = raw
      @queue = queue
    end

    def run
      Async do
        begin
          # First parse the raw message from redis
          payload = JSON.parse(@raw)

          # Then load the definition of the job + its arguments
          klass = Object.const_get(payload['job_class'])
          args = payload['arguments']

          # Then run the task
          klass.new.perform(*args)
        rescue JSON::ParserError => e
          Quiq.logger.warn("Invalid format: #{e}")
          send_to_dlq(@raw, e)
        rescue StandardError => e
          Quiq.logger.debug("Sending message to DLQ: #{e}")
          send_to_dlq(payload, e)
        ensure
          # Remove the job from the processing list
          Queue.delete(@queue.processing, @raw)
        end
      end
    end

    private

    def send_to_dlq(payload, exception)
      if payload.is_a?(Hash)
        payload['error'] = exception.to_s
        payload['backtrace'] = exception.backtrace
        message = JSON.dump(payload)
      else
        message = @raw
      end

      Queue.send_to_dlq(message)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
quiq-0.2.0 lib/quiq/job.rb