Sha256: 00531c4602d9f948a59eadd6d2b9c9b20c5015dcb69ba1b113b99d9223757d31

Contents?: true

Size: 1.45 KB

Versions: 2

Compression:

Stored size: 1.45 KB

Contents

module WaterDrop
  # Message class which encapsulate single Kafka message logic and its delivery
  class Message
    attr_reader :topic, :message

    # @param topic [String, Symbol] a topic to which we want to send a message
    # @param message [Object] any object that can be serialized to a JSON string or
    #   that can be casted to a string
    # @return [WaterDrop::Message] WaterDrop message instance
    # @example Creating a new message
    #   WaterDrop::Message.new(topic, message)
    def initialize(topic, message)
      @topic = topic.to_s
      @message = message.respond_to?(:to_json) ? message.to_json : message.to_s
    end

    # Sents a current message to Kafka
    # @note Won't send any messages if send_messages config flag is set to false
    # @example Set a message
    #   WaterDrop::Message.new(topic, message).send!
    def send!
      return true unless ::WaterDrop.config.send_messages?

      Pool.with do |producer|
        producer.send_messages([
          Poseidon::MessageToSend.new(topic, message)
        ])
      end
      ::WaterDrop.logger.info("Message #{message} was sent to topic '#{topic}'")
    rescue StandardError => e
      # Even if we dont reraise this exception, it should log that it happened
      ::WaterDrop.logger.error(e)
      # Reraise if we want to raise on failure
      # Ignore if we dont want to know that something went wrong
      return unless ::WaterDrop.config.raise_on_failure?
      raise(e)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
waterdrop-0.1.11 lib/water_drop/message.rb
waterdrop-0.1.10 lib/water_drop/message.rb