lib/water_drop/producer_proxy.rb in waterdrop-0.1.11 vs lib/water_drop/producer_proxy.rb in waterdrop-0.1.12
- old
+ new
@@ -7,26 +7,32 @@
class ProducerProxy
# How long should be object considered alive if nothing is being
# send using it. After that time, we will recreate the connection
LIFE_TIME = 5 * 60 # 5 minute
+ # If sending fails - how many times we should try with a new connection
+ # @note It works in a similar way to Poseidon internal max_send_retries option, but it will
+ # create a new connection after failure (Poseidon tries to use the same one)
+ MAX_SEND_RETRIES = 1
+
# All default poseidon parameters that we want to use
POSEIDON_PARAMS = {
metadata_refresh_interval_ms: 5 * 60 * 1000, # 5 minutes
# @see https://kafka.apache.org/08/configuration.html
# Security level for producer
required_acks: -1,
# @see https://issues.apache.org/jira/browse/KAFKA-1494
retry_backoff_ms: 1000,
- max_send_retries: 5
+ max_send_retries: 1
}
# @return [WaterDrop::ProducerProxy] proxy object to Poseidon::Producer
# @note To ignore @last_usage nil case - we just assume that it is being
# first used when we create it
def initialize
touch
+ @attempts = 0
end
# Sends messages to Kafka
# @param messages [Array<Poseidon::MessageToSend>] array with messages that we want to send
# @return [Boolean] were the messages send
@@ -38,10 +44,15 @@
def send_messages(messages)
touch
producer.send_messages(messages)
rescue StandardError => e
reload!
+
+ retry if (@attempts += 1) <= MAX_SEND_RETRIES
+
raise(e)
+ ensure
+ @attempts = 0
end
private
# Refreshes last usage value with current time