lib/logstash/outputs/redis.rb in logstash-output-redis-0.1.0 vs lib/logstash/outputs/redis.rb in logstash-output-redis-0.1.1

- old
+ new

@@ -8,11 +8,11 @@ # PUBLISH to a channel requires at least v1.3.8+. # While you may be able to make these Redis versions work, # the best performance and stability will be found in more # recent stable versions. Versions 2.6.0+ are recommended. # -# For more information about Redis, see <http://redis.io/> +# For more information, see http://redis.io/[the Redis homepage] # class LogStash::Outputs::Redis < LogStash::Outputs::Base include Stud::Buffer @@ -26,11 +26,11 @@ # The hostname(s) of your Redis server(s). Ports may be specified on any # hostname, which will override the global port config. # # For example: - # + # [source,ruby] # "127.0.0.1" # ["127.0.0.1", "127.0.0.2"] # ["127.0.0.1:6380", "127.0.0.1"] config :host, :validate => :array, :default => ["127.0.0.1"] @@ -48,16 +48,16 @@ # Password to authenticate with. There is no authentication by default. config :password, :validate => :password # The name of the Redis queue (we'll use RPUSH on this). Dynamic names are - # valid here, for example "logstash-%{type}" + # valid here, for example `logstash-%{type}` # TODO: delete config :queue, :validate => :string, :deprecated => true # The name of a Redis list or channel. Dynamic names are - # valid here, for example "logstash-%{type}". + # valid here, for example `logstash-%{type}`. # TODO set required true config :key, :validate => :string, :required => false # Either list or channel. If `redis_type` is list, then we will set # RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. @@ -81,11 +81,11 @@ config :batch_timeout, :validate => :number, :default => 5 # Interval for reconnecting to failed Redis connections config :reconnect_interval, :validate => :number, :default => 1 - # In case Redis `data_type` is "list" and has more than @congestion_threshold items, + # In case Redis `data_type` is `list` and has more than `@congestion_threshold` items, # block until someone consumes them and reduces congestion, otherwise if there are # no consumers Redis will run out of memory, unless it was configured with OOM protection. # But even with OOM protection, a single Redis list can block all other users of Redis, # until Redis CPU consumption reaches the max allowed RAM size. # A default value of 0 means that this limit is disabled. @@ -186,10 +186,10 @@ if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold. @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval end - @congestion_check_time = Time.now.to_i + @congestion_check_times[key] = Time.now.to_i end end # called from Stud::Buffer#buffer_flush when there are events to flush def flush(events, key, teardown=false)