lib/logstash/outputs/redis.rb in logstash-output-redis-0.1.0 vs lib/logstash/outputs/redis.rb in logstash-output-redis-0.1.1
- old
+ new
@@ -8,11 +8,11 @@
# PUBLISH to a channel requires at least v1.3.8+.
# While you may be able to make these Redis versions work,
# the best performance and stability will be found in more
# recent stable versions. Versions 2.6.0+ are recommended.
#
-# For more information about Redis, see <http://redis.io/>
+# For more information, see http://redis.io/[the Redis homepage]
#
class LogStash::Outputs::Redis < LogStash::Outputs::Base
include Stud::Buffer
@@ -26,11 +26,11 @@
# The hostname(s) of your Redis server(s). Ports may be specified on any
# hostname, which will override the global port config.
#
# For example:
- #
+ # [source,ruby]
# "127.0.0.1"
# ["127.0.0.1", "127.0.0.2"]
# ["127.0.0.1:6380", "127.0.0.1"]
config :host, :validate => :array, :default => ["127.0.0.1"]
@@ -48,16 +48,16 @@
# Password to authenticate with. There is no authentication by default.
config :password, :validate => :password
# The name of the Redis queue (we'll use RPUSH on this). Dynamic names are
- # valid here, for example "logstash-%{type}"
+ # valid here, for example `logstash-%{type}`
# TODO: delete
config :queue, :validate => :string, :deprecated => true
# The name of a Redis list or channel. Dynamic names are
- # valid here, for example "logstash-%{type}".
+ # valid here, for example `logstash-%{type}`.
# TODO set required true
config :key, :validate => :string, :required => false
# Either list or channel. If `redis_type` is list, then we will set
# RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`.
@@ -81,11 +81,11 @@
config :batch_timeout, :validate => :number, :default => 5
# Interval for reconnecting to failed Redis connections
config :reconnect_interval, :validate => :number, :default => 1
- # In case Redis `data_type` is "list" and has more than @congestion_threshold items,
+ # In case Redis `data_type` is `list` and has more than `@congestion_threshold` items,
# block until someone consumes them and reduces congestion, otherwise if there are
# no consumers Redis will run out of memory, unless it was configured with OOM protection.
# But even with OOM protection, a single Redis list can block all other users of Redis,
# until Redis CPU consumption reaches the max allowed RAM size.
# A default value of 0 means that this limit is disabled.
@@ -186,10 +186,10 @@
if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold.
@logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds")
sleep @congestion_interval
end
- @congestion_check_time = Time.now.to_i
+ @congestion_check_times[key] = Time.now.to_i
end
end
# called from Stud::Buffer#buffer_flush when there are events to flush
def flush(events, key, teardown=false)