lib/water_drop/producer_proxy.rb in waterdrop-0.1.11 vs lib/water_drop/producer_proxy.rb in waterdrop-0.1.12

- old
+ new

@@ -7,26 +7,32 @@ class ProducerProxy # How long should be object considered alive if nothing is being # send using it. After that time, we will recreate the connection LIFE_TIME = 5 * 60 # 5 minute + # If sending fails - how many times we should try with a new connection + # @note It works in a similar way to Poseidon internal max_send_retries option, but it will + # create a new connection after failure (Poseidon tries to use the same one) + MAX_SEND_RETRIES = 1 + # All default poseidon parameters that we want to use POSEIDON_PARAMS = { metadata_refresh_interval_ms: 5 * 60 * 1000, # 5 minutes # @see https://kafka.apache.org/08/configuration.html # Security level for producer required_acks: -1, # @see https://issues.apache.org/jira/browse/KAFKA-1494 retry_backoff_ms: 1000, - max_send_retries: 5 + max_send_retries: 1 } # @return [WaterDrop::ProducerProxy] proxy object to Poseidon::Producer # @note To ignore @last_usage nil case - we just assume that it is being # first used when we create it def initialize touch + @attempts = 0 end # Sends messages to Kafka # @param messages [Array<Poseidon::MessageToSend>] array with messages that we want to send # @return [Boolean] were the messages send @@ -38,10 +44,15 @@ def send_messages(messages) touch producer.send_messages(messages) rescue StandardError => e reload! + + retry if (@attempts += 1) <= MAX_SEND_RETRIES + raise(e) + ensure + @attempts = 0 end private # Refreshes last usage value with current time