Sha256: 6156aa162e7a571cd5a0a46fb4ef56d5215874480669b6224d20c774a06002c5

Contents?: true

Size: 1.48 KB

Versions: 2

Compression:

Stored size: 1.48 KB

Contents

# frozen_string_literal: true

module WaterDrop
  # Message class which encapsulate single Kafka message logic and its delivery
  class Message
    attr_reader :topic, :message, :options

    # @param topic [String, Symbol] a topic to which we want to send a message
    # @param message [Object] any object that can be serialized to a JSON string or
    #   that can be casted to a string
    # @param options [Hash] (optional) additional options to pass to the Kafka producer
    # @return [WaterDrop::Message] WaterDrop message instance
    # @example Creating a new message
    #   WaterDrop::Message.new(topic, message)
    def initialize(topic, message, options = {})
      @topic = topic.to_s
      @message = message
      @options = options
    end

    # Sents a current message to Kafka
    # @note Won't send any messages if send_messages config flag is set to false
    # @example Set a message
    #   WaterDrop::Message.new(topic, message).send!
    def send!
      return true unless ::WaterDrop.config.send_messages

      Pool.with { |producer| producer.send_message(self) }

      ::WaterDrop.logger.info("Message #{message} was sent to topic '#{topic}'")
    rescue StandardError => e
      # Even if we dont reraise this exception, it should log that it happened
      ::WaterDrop.logger.error(e)
      # Reraise if we want to raise on failure
      # Ignore if we dont want to know that something went wrong
      return unless ::WaterDrop.config.raise_on_failure
      raise(e)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
waterdrop-0.4.0 lib/water_drop/message.rb
waterdrop-0.3.2.4 lib/water_drop/message.rb