lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.6 vs lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.7

- old
+ new

@@ -15,67 +15,64 @@ config :tls_key, :validate => :string config :tls_cert, :validate => :string public def register - @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) + @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) + if @tls_key and @tls_cert + @consumer = Nsq::Consumer.new( + :nsqlookupd => @nsqlookupd, + :topic => @topic, + :channel => @channel, + :max_in_flight => @max_in_flight, + :tls_v1 => @tls_v1, + :tls_context => { + key: @tls_key, + certificate: @tls_cert + } + ) + else + @consumer = Nsq::Consumer.new( + :nsqlookupd => @nsqlookupd, + :topic => @topic, + :channel => @channel, + :tls_v1 => @tls_v1, + :max_in_flight => @max_in_flight + ) + end end # def register public def run(logstash_queue) @logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) begin - begin - if @tls_key and @tls_cert - consumer = Nsq::Consumer.new( - :nsqlookupd => @nsqlookupd, - :topic => @topic, - :channel => @channel, - :max_in_flight => @max_in_flight, - :tls_v1 => @tls_v1, - :tls_context => { - key: @tls_key, - certificate: @tls_cert - } - ) - else - consumer = Nsq::Consumer.new( - :nsqlookupd => @nsqlookupd, - :topic => @topic, - :channel => @channel, - :tls_v1 => @tls_v1, - :max_in_flight => @max_in_flight - ) - end - while true + while !stop? #@logger.info('consuming...') - event = consumer.pop + event = @consumer.pop #@logger.info('processing:', :event => event) queue_event(event.body, logstash_queue) - event.finish - end - rescue LogStash::ShutdownSignal - @logger.info('nsq got shutdown signal') - end - @logger.info('Done running nsq input') - rescue => e - @logger.warn('client threw exception, restarting', - :exception => e) - retry + event.finish + end + @logger.warn('Done running nsq input') end - finished end # def run private def queue_event(body, output_queue) begin #@logger.info('processing:', :body => body) - event = LogStash::Event.new("message" => body) - decorate(event) - output_queue << event - rescue => e # parse or event creation error - @logger.error('Failed to create event', :message => "#{body}", :exception => e, + event = LogStash::Event.new("message" => body) + decorate(event) + output_queue << event + rescue => e # parse or event creation error + @logger.error('Failed to create event', :message => "#{body}", :exception => e, :backtrace => e.backtrace) end # begin end # def queue_event + + public + def stop + @logger.warn('nsq got stop signal. terminate') + @consumer.terminate + end #stop end #class LogStash::Inputs::Nsq