Sha256: bc291df372b5ba93d65b9161681b8b2fe51a3a8e0eeddaf756bf072a7d6f7ac5

Contents?: true

Size: 1.94 KB

Versions: 1

Compression:

Stored size: 1.94 KB

Contents

class Journaled::Writer
  EVENT_METHOD_NAMES = %i(
    journaled_schema_name
    journaled_partition_key
    journaled_attributes
    journaled_stream_name
    journaled_enqueue_opts
  ).freeze

  def initialize(journaled_event:)
    raise "An enqueued event must respond to: #{EVENT_METHOD_NAMES.to_sentence}" unless respond_to_all?(journaled_event, EVENT_METHOD_NAMES)

    unless journaled_event.journaled_schema_name.present? &&
        journaled_event.journaled_partition_key.present? &&
        journaled_event.journaled_attributes.present?
      raise <<~ERROR
        An enqueued event must have a non-nil response to:
          #json_schema_name,
          #partition_key, and
          #journaled_attributes
      ERROR
    end

    @journaled_event = journaled_event
  end

  def journal!
    validate!
    ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: journaled_event, priority: job_opts[:priority]) do
      Journaled::DeliveryJob.set(job_opts).perform_later(**delivery_perform_args)
    end
  end

  private

  attr_reader :journaled_event

  delegate(*EVENT_METHOD_NAMES, to: :journaled_event)

  def validate!
    schema_validator('base_event').validate! serialized_event
    schema_validator('tagged_event').validate! serialized_event if journaled_event.tagged?
    schema_validator(journaled_schema_name).validate! serialized_event
  end

  def job_opts
    journaled_enqueue_opts.reverse_merge(priority: Journaled.job_priority)
  end

  def delivery_perform_args
    {
      serialized_event: serialized_event,
      partition_key: journaled_partition_key,
      stream_name: journaled_stream_name,
    }
  end

  def serialized_event
    @serialized_event ||= journaled_attributes.to_json
  end

  def schema_validator(schema_name)
    Journaled::JsonSchemaModel::Validator.new(schema_name)
  end

  def respond_to_all?(object, method_names)
    method_names.all? do |method_name|
      object.respond_to?(method_name)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
journaled-4.3.0 app/models/journaled/writer.rb