Sha256: 95c6e7826b2d45613d6506844da40caa1c1053e15267a81eaf875da79f346a66

Contents?: true

Size: 1.44 KB

Versions: 5

Compression:

Stored size: 1.44 KB

Contents

# frozen_string_literal: true

require 'cyclone_lariat/core'
require 'cyclone_lariat/clients/sns'
require 'cyclone_lariat/plugins/outbox/configurable'
require 'cyclone_lariat/plugins/outbox/loadable'
require 'cyclone_lariat/plugins/outbox/extensions/active_record_outbox'
require 'cyclone_lariat/plugins/outbox/extensions/sequel_outbox'
require 'cyclone_lariat/plugins/outbox/repo/messages'

module CycloneLariat
  class Outbox
    extend CycloneLariat::Outbox::Configurable
    extend CycloneLariat::Outbox::Loadable
    include LunaPark::Extensions::Injector

    dependency(:sns_client) { CycloneLariat::Clients::Sns.new }
    dependency(:repo)       { CycloneLariat::Outbox::Repo::Messages.new }

    attr_reader :messages

    def initialize
      @messages = []
    end

    def publish
      sent_message_uids = messages.each_with_object([]) do |message, sent_message_uuids|
        begin
          sns_client.publish message, fifo: message.fifo?
          sent_message_uuids << message.uuid
        rescue StandardError => e
          repo.update_error(message.uuid, e.message)
          config.on_sending_error&.call(message, e)
          next
        end
      end
      repo.delete(sent_message_uids) unless sent_message_uids.empty?
    end

    def <<(message)
      message.uuid = repo.create(message)
      messages << message
    end

    def push(message)
      self << message
    end

    private

    def config
      self.class.config
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
cyclone_lariat-1.0.0 lib/cyclone_lariat/plugins/outbox.rb
cyclone_lariat-1.0.0.rc9 lib/cyclone_lariat/plugins/outbox.rb
cyclone_lariat-1.0.0.rc8 lib/cyclone_lariat/plugins/outbox.rb
cyclone_lariat-1.0.0.rc7 lib/cyclone_lariat/plugins/outbox.rb
cyclone_lariat-1.0.0.rc6 lib/cyclone_lariat/plugins/outbox.rb