Sha256: 034ee4b165b6ab0802add365db6dc3253edd70dc4800ac91fcfc5d2b5d68497d

Contents?: true

Size: 1.7 KB

Versions: 3

Compression:

Stored size: 1.7 KB

Contents

# frozen_string_literal: true

module ActiveEventStore
  # Base job for async subscribers
  class SubscriberJob < ActiveJob::Base
    class << self
      attr_accessor :subscriber

      def from(callable)
        if callable.is_a?(Proc) || callable.name.nil?
          raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) " \
                                "could not be asynchronous (use sync: true)"
        end

        raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module)

        if callable.const_defined?(:SubscriberJob, false)
          callable.const_get(:SubscriberJob, false)
        else
          callable.const_set(
            :SubscriberJob,
            Class.new(self).tap do |job|
              queue_as ActiveEventStore.config.job_queue_name

              job.subscriber = callable
            end
          )
        end
      end

      def for(callable)
        raise ArgumentError, "Async subscriber must be a module/class" unless callable.is_a?(Module)

        callable.const_defined?(:SubscriberJob, false) ?
          callable.const_get(:SubscriberJob, false) :
          nil
      end
    end

    def perform(payload)
      event = event_store.deserialize(**payload.symbolize_keys, serializer: ActiveEventStore.config.serializer)

      event_store.with_metadata(**event.metadata.to_h) do
        subscriber.call(event)
      end
    end

    private

    def subscriber
      if self.class.subscriber.is_a?(Class) && !self.class.subscriber.respond_to?(:call)
        return self.class.subscriber.new
      end

      self.class.subscriber
    end

    def event_store
      ActiveEventStore.event_store
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
active_event_store-1.2.1 lib/active_event_store/subscriber_job.rb
active_event_store-1.2.0 lib/active_event_store/subscriber_job.rb
active_event_store-1.1.0 lib/active_event_store/subscriber_job.rb