lib/logstash/outputs/redis.rb in logstash-output-redis-2.0.2 vs lib/logstash/outputs/redis.rb in logstash-output-redis-2.0.4
- old
+ new
@@ -16,10 +16,12 @@
include Stud::Buffer
config_name "redis"
+ default :codec, "json"
+
# Name is used for logging in case there are multiple instances.
# TODO: delete
config :name, :validate => :string, :default => 'default',
:deprecated => true
@@ -135,51 +137,27 @@
@host.shuffle!
end
@host_idx = 0
@congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }
+
+ @codec.on_event(&method(:send_to_redis))
end # def register
def receive(event)
- if @batch and @data_type == 'list' # Don't use batched method for pubsub.
- # Stud::Buffer
- buffer_receive(event.to_json, event.sprintf(@key))
- return
- end
-
- key = event.sprintf(@key)
# TODO(sissel): We really should not drop an event, but historically
# we have dropped events that fail to be converted to json.
# TODO(sissel): Find a way to continue passing events through even
# if they fail to convert properly.
begin
- payload = event.to_json
- rescue Encoding::UndefinedConversionError, ArgumentError
- puts "FAILUREENCODING"
- @logger.error("Failed to convert event to JSON. Invalid UTF-8, maybe?",
- :event => event.inspect)
- return
+ @codec.encode(event)
+ rescue StandardError => e
+ @logger.warn("Error encoding event", :exception => e,
+ :event => event)
end
-
- begin
- @redis ||= connect
- if @data_type == 'list'
- congestion_check(key)
- @redis.rpush(key, payload)
- else
- @redis.publish(key, payload)
- end
- rescue => e
- @logger.warn("Failed to send event to Redis", :event => event,
- :identity => identity, :exception => e,
- :backtrace => e.backtrace)
- sleep @reconnect_interval
- @redis = nil
- retry
- end
end # def receive
def congestion_check(key)
return if @congestion_threshold == 0
if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
@@ -246,6 +224,33 @@
# A string used to identify a Redis instance in log messages
def identity
@name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
end
+ def send_to_redis(event, payload)
+ # How can I do this sort of thing with codecs?
+ key = event.sprintf(@key)
+
+ if @batch and @data_type == 'list' # Don't use batched method for pubsub.
+ # Stud::Buffer
+ buffer_receive(payload, key)
+ next
+ end
+
+ begin
+ @redis ||= connect
+ if @data_type == 'list'
+ congestion_check(key)
+ @redis.rpush(key, payload)
+ else
+ @redis.publish(key, payload)
+ end
+ rescue => e
+ @logger.warn("Failed to send event to Redis", :event => event,
+ :identity => identity, :exception => e,
+ :backtrace => e.backtrace)
+ sleep @reconnect_interval
+ @redis = nil
+ retry
+ end
+ end
end