Sha256: 2879e2bac2f6733a713047ac268f4f38edef0e075a271b15af747820547f5c39
Contents?: true
Size: 1.69 KB
Versions: 1
Compression:
Stored size: 1.69 KB
Contents
# frozen_string_literal: true module ActiveEventStore # Base job for async subscribers class SubscriberJob < ActiveJob::Base class << self attr_accessor :subscriber def from(callable) if callable.is_a?(Proc) || callable.name.nil? raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) " \ "could not be asynchronous (use sync: true)" end raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module) if callable.const_defined?(:SubscriberJob, false) callable.const_get(:SubscriberJob, false) else callable.const_set( :SubscriberJob, Class.new(self).tap do |job| queue_as ActiveEventStore.config.job_queue_name job.subscriber = callable end ) end end def for(callable) raise ArgumentError, "Async subscriber must be a module/class" unless callable.is_a?(Module) callable.const_defined?(:SubscriberJob, false) ? callable.const_get(:SubscriberJob, false) : nil end end def perform(payload) event = event_store.deserialize(**payload, serializer: ActiveEventStore.config.serializer) event_store.with_metadata(**event.metadata.to_h) do subscriber.call(event) end end private def subscriber if self.class.subscriber.is_a?(Class) && !self.class.subscriber.respond_to?(:call) return self.class.subscriber.new end self.class.subscriber end def event_store ActiveEventStore.event_store end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
active_event_store-1.0.2 | lib/active_event_store/subscriber_job.rb |