Sha256: 00531c4602d9f948a59eadd6d2b9c9b20c5015dcb69ba1b113b99d9223757d31
Contents?: true
Size: 1.45 KB
Versions: 2
Compression:
Stored size: 1.45 KB
Contents
module WaterDrop # Message class which encapsulate single Kafka message logic and its delivery class Message attr_reader :topic, :message # @param topic [String, Symbol] a topic to which we want to send a message # @param message [Object] any object that can be serialized to a JSON string or # that can be casted to a string # @return [WaterDrop::Message] WaterDrop message instance # @example Creating a new message # WaterDrop::Message.new(topic, message) def initialize(topic, message) @topic = topic.to_s @message = message.respond_to?(:to_json) ? message.to_json : message.to_s end # Sents a current message to Kafka # @note Won't send any messages if send_messages config flag is set to false # @example Set a message # WaterDrop::Message.new(topic, message).send! def send! return true unless ::WaterDrop.config.send_messages? Pool.with do |producer| producer.send_messages([ Poseidon::MessageToSend.new(topic, message) ]) end ::WaterDrop.logger.info("Message #{message} was sent to topic '#{topic}'") rescue StandardError => e # Even if we dont reraise this exception, it should log that it happened ::WaterDrop.logger.error(e) # Reraise if we want to raise on failure # Ignore if we dont want to know that something went wrong return unless ::WaterDrop.config.raise_on_failure? raise(e) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
waterdrop-0.1.11 | lib/water_drop/message.rb |
waterdrop-0.1.10 | lib/water_drop/message.rb |