lib/water_drop/producer_proxy.rb in waterdrop-0.1.10 vs lib/water_drop/producer_proxy.rb in waterdrop-0.1.11
- old
+ new
@@ -7,17 +7,21 @@
class ProducerProxy
# How long should be object considered alive if nothing is being
# send using it. After that time, we will recreate the connection
LIFE_TIME = 5 * 60 # 5 minute
- # How often should we refresh meta data from Kafka
- METADATA_REFRESH_INTERVAL = 5 * 60 # 5 minute
+ # All default poseidon parameters that we want to use
+ POSEIDON_PARAMS = {
+ metadata_refresh_interval_ms: 5 * 60 * 1000, # 5 minutes
+ # @see https://kafka.apache.org/08/configuration.html
+ # Security level for producer
+ required_acks: -1,
+ # @see https://issues.apache.org/jira/browse/KAFKA-1494
+ retry_backoff_ms: 1000,
+ max_send_retries: 5
+ }
- # @see https://kafka.apache.org/08/configuration.html
- # Security level for producer
- REQUIRED_ACKS = -1
-
# @return [WaterDrop::ProducerProxy] proxy object to Poseidon::Producer
# @note To ignore @last_usage nil case - we just assume that it is being
# first used when we create it
def initialize
touch
@@ -51,11 +55,10 @@
reload! if dead?
# Metadata refresh interval needs to be in miliseconds
@producer ||= Poseidon::Producer.new(
::WaterDrop.config.kafka_hosts,
producer_id,
- metadata_refresh_interval_ms: METADATA_REFRESH_INTERVAL * 1000,
- required_acks: REQUIRED_ACKS
+ POSEIDON_PARAMS
)
end
# @return [String] random unique id for producer
def producer_id