Sha256: 4b3494d33ef61fff6d23a45336a706f17754342f9487b5a61bde1662998a636e

Contents?: true

Size: 1.28 KB

Versions: 4

Compression:

Stored size: 1.28 KB

Contents

module Eventboss
  # UnitOfWork handles calls a listener for each message and deletes on success
  class UnitOfWork
    include Logging
    include SafeThread

    attr_accessor :queue, :listener, :message

    def initialize(queue, listener, message)
      @queue = queue
      @listener = listener
      @message = message
      @logger = logger
    end

    def run(client)
      logger.debug(@message.message_id) { 'Started' }
      processor = @listener.new
      processor.receive(JSON.parse(@message.body))
      logger.debug(@message.message_id) { 'Finished' }
    rescue StandardError => exception
      handle_exception(exception, processor: processor, message_id: @message.message_id)
    else
      cleanup(client) unless processor.postponed_by
    ensure
      change_message_visibility(client, processor.postponed_by) if processor.postponed_by
    end

    def change_message_visibility(client, postponed_by)
      client.change_message_visibility(
        queue_url: @queue.url,
        receipt_handle: @message.receipt_handle,
        visibility_timeout: postponed_by
      )
    end

    def cleanup(client)
      client.delete_message(
        queue_url: @queue.url, receipt_handle: @message.receipt_handle
      )
      logger.debug(@message.message_id) { 'Deleting' }
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
eventboss-1.3.1 lib/eventboss/unit_of_work.rb
eventboss-1.3.0 lib/eventboss/unit_of_work.rb
eventboss-1.2.1 lib/eventboss/unit_of_work.rb
eventboss-1.2.0 lib/eventboss/unit_of_work.rb