Sha256: a6d110b6b6fcf3509a24a7a68457ac3d1ed2a633a5005b6adb9eaf5ae005b158

Contents?: true

Size: 1.39 KB

Versions: 3

Compression:

Stored size: 1.39 KB

Contents

# frozen_string_literal: true

module WaterDrop
  # Base messages producer that contains all the logic that is exactly the same for both
  # sync and async producers
  class BaseProducer
    class << self
      # Delivery boy method name that we use to invoke producer action
      attr_accessor :method_name

      # Performs message delivery using method_name method
      # @param message [String] message that we want to send to Kafka
      # @param options [Hash] options (including topic) for producer
      # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are
      #   somehow invalid and we cannot perform delivery because of that
      def call(message, options)
        validate!(options)
        return unless WaterDrop.config.deliver
        DeliveryBoy.public_send(method_name, message, options)
      end

      private

      # Runs the message options validations and raises an error if anything is invalid
      # @param options [Hash] hash that we want to validate
      # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are
      #   somehow invalid and we cannot perform delivery because of that
      def validate!(options)
        validation_result = Schemas::MessageOptions.call(options)
        return true if validation_result.success?
        raise Errors::InvalidMessageOptions, validation_result.errors
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
waterdrop-1.0.0 lib/water_drop/base_producer.rb
waterdrop-1.0.0.alpha2 lib/water_drop/base_producer.rb
waterdrop-1.0.0.alpha1 lib/water_drop/base_producer.rb