Sha256: 8b6df8e9ea4650ec0ebed5bde3aff255190faa3dc8c076c61bbc2c714d342f66
Contents?: true
Size: 1.18 KB
Versions: 1
Compression:
Stored size: 1.18 KB
Contents
# frozen_string_literal: true require 'json' module Quiq class Job def initialize(raw, queue) @raw = raw @queue = queue end def run Async do begin # First parse the raw message from redis payload = JSON.parse(@raw) # Then load the definition of the job + its arguments klass = Object.const_get(payload['job_class']) args = payload['arguments'] # Then run the task klass.new.perform(*args) rescue JSON::ParserError => e Quiq.logger.warn("Invalid format: #{e}") send_to_dlq(@raw, e) rescue StandardError => e Quiq.logger.debug("Sending message to DLQ: #{e}") send_to_dlq(payload, e) ensure # Remove the job from the processing list Queue.delete(@queue.processing, @raw) end end end private def send_to_dlq(payload, exception) if payload.is_a?(Hash) payload['error'] = exception.to_s payload['backtrace'] = exception.backtrace message = JSON.dump(payload) else message = @raw end Queue.send_to_dlq(message) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
quiq-0.2.0 | lib/quiq/job.rb |