Sha256: 2879e2bac2f6733a713047ac268f4f38edef0e075a271b15af747820547f5c39

Contents?: true

Size: 1.69 KB

Versions: 1

Compression:

Stored size: 1.69 KB

Contents

# frozen_string_literal: true

module ActiveEventStore
  # Base job for async subscribers
  class SubscriberJob < ActiveJob::Base
    class << self
      attr_accessor :subscriber

      def from(callable)
        if callable.is_a?(Proc) || callable.name.nil?
          raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) " \
                                "could not be asynchronous (use sync: true)"
        end

        raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module)

        if callable.const_defined?(:SubscriberJob, false)
          callable.const_get(:SubscriberJob, false)
        else
          callable.const_set(
            :SubscriberJob,
            Class.new(self).tap do |job|
              queue_as ActiveEventStore.config.job_queue_name

              job.subscriber = callable
            end
          )
        end
      end

      def for(callable)
        raise ArgumentError, "Async subscriber must be a module/class" unless callable.is_a?(Module)

        callable.const_defined?(:SubscriberJob, false) ?
          callable.const_get(:SubscriberJob, false) :
          nil
      end
    end

    def perform(payload)
      event = event_store.deserialize(**payload, serializer: ActiveEventStore.config.serializer)

      event_store.with_metadata(**event.metadata.to_h) do
        subscriber.call(event)
      end
    end

    private

    def subscriber
      if self.class.subscriber.is_a?(Class) && !self.class.subscriber.respond_to?(:call)
        return self.class.subscriber.new
      end

      self.class.subscriber
    end

    def event_store
      ActiveEventStore.event_store
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
active_event_store-1.0.2 lib/active_event_store/subscriber_job.rb