Sha256: 72388c491f2feb5f0b9aabb67db96226989c298bd72ebdeeec69fe6372849092
Contents?: true
Size: 1.87 KB
Versions: 2
Compression:
Stored size: 1.87 KB
Contents
require 'logstash/namespace' require 'logstash/inputs/base' require 'nsq' class LogStash::Inputs::Nsq < LogStash::Inputs::Base config_name 'nsq' default :codec, 'json' config :nsqlookupd, :validate => :array, :default => 'localhost:4161' config :channel, :validate => :string, :default => 'logstash' config :topic, :validate => :string, :default => 'testtopic' config :max_in_flight, :validate => :number, :default => 100 public def register @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) end # def register public def run(logstash_queue) @logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) begin begin consumer = Nsq::Consumer.new( :nsqlookupd => @nsqlookupd, :topic => @topic, :channel => @channel, :max_in_flight => @max_in_flight ) while true #@logger.info('consuming...') event = consumer.pop #@logger.info('processing:', :event => event) queue_event(event.body, logstash_queue) event.finish end rescue LogStash::ShutdownSignal @logger.info('nsq got shutdown signal') end @logger.info('Done running nsq input') rescue => e @logger.warn('client threw exception, restarting', :exception => e) retry end finished end # def run private def queue_event(body, output_queue) begin #@logger.info('processing:', :body => body) event = LogStash::Event.new("message" => body) decorate(event) output_queue << event rescue => e # parse or event creation error @logger.error('Failed to create event', :message => "#{body}", :exception => e, :backtrace => e.backtrace) end # begin end # def queue_event end #class LogStash::Inputs::Nsq
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
logstash-input-nsq-1.0.4 | lib/logstash/inputs/nsq.rb |
logstash-input-nsq-1.0.3 | lib/logstash/inputs/nsq.rb |