lib/logstash/outputs/redis.rb in logstash-output-redis-2.0.2 vs lib/logstash/outputs/redis.rb in logstash-output-redis-2.0.4

- old
+ new

@@ -16,10 +16,12 @@ include Stud::Buffer config_name "redis" + default :codec, "json" + # Name is used for logging in case there are multiple instances. # TODO: delete config :name, :validate => :string, :default => 'default', :deprecated => true @@ -135,51 +137,27 @@ @host.shuffle! end @host_idx = 0 @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval } + + @codec.on_event(&method(:send_to_redis)) end # def register def receive(event) - if @batch and @data_type == 'list' # Don't use batched method for pubsub. - # Stud::Buffer - buffer_receive(event.to_json, event.sprintf(@key)) - return - end - - key = event.sprintf(@key) # TODO(sissel): We really should not drop an event, but historically # we have dropped events that fail to be converted to json. # TODO(sissel): Find a way to continue passing events through even # if they fail to convert properly. begin - payload = event.to_json - rescue Encoding::UndefinedConversionError, ArgumentError - puts "FAILUREENCODING" - @logger.error("Failed to convert event to JSON. Invalid UTF-8, maybe?", - :event => event.inspect) - return + @codec.encode(event) + rescue StandardError => e + @logger.warn("Error encoding event", :exception => e, + :event => event) end - - begin - @redis ||= connect - if @data_type == 'list' - congestion_check(key) - @redis.rpush(key, payload) - else - @redis.publish(key, payload) - end - rescue => e - @logger.warn("Failed to send event to Redis", :event => event, - :identity => identity, :exception => e, - :backtrace => e.backtrace) - sleep @reconnect_interval - @redis = nil - retry - end end # def receive def congestion_check(key) return if @congestion_threshold == 0 if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. @@ -246,6 +224,33 @@ # A string used to identify a Redis instance in log messages def identity @name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}" end + def send_to_redis(event, payload) + # How can I do this sort of thing with codecs? + key = event.sprintf(@key) + + if @batch and @data_type == 'list' # Don't use batched method for pubsub. + # Stud::Buffer + buffer_receive(payload, key) + next + end + + begin + @redis ||= connect + if @data_type == 'list' + congestion_check(key) + @redis.rpush(key, payload) + else + @redis.publish(key, payload) + end + rescue => e + @logger.warn("Failed to send event to Redis", :event => event, + :identity => identity, :exception => e, + :backtrace => e.backtrace) + sleep @reconnect_interval + @redis = nil + retry + end + end end