lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.6 vs lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.7
- old
+ new
@@ -15,67 +15,64 @@
config :tls_key, :validate => :string
config :tls_cert, :validate => :string
public
def register
- @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
+ @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
+ if @tls_key and @tls_cert
+ @consumer = Nsq::Consumer.new(
+ :nsqlookupd => @nsqlookupd,
+ :topic => @topic,
+ :channel => @channel,
+ :max_in_flight => @max_in_flight,
+ :tls_v1 => @tls_v1,
+ :tls_context => {
+ key: @tls_key,
+ certificate: @tls_cert
+ }
+ )
+ else
+ @consumer = Nsq::Consumer.new(
+ :nsqlookupd => @nsqlookupd,
+ :topic => @topic,
+ :channel => @channel,
+ :tls_v1 => @tls_v1,
+ :max_in_flight => @max_in_flight
+ )
+ end
end # def register
public
def run(logstash_queue)
@logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
begin
- begin
- if @tls_key and @tls_cert
- consumer = Nsq::Consumer.new(
- :nsqlookupd => @nsqlookupd,
- :topic => @topic,
- :channel => @channel,
- :max_in_flight => @max_in_flight,
- :tls_v1 => @tls_v1,
- :tls_context => {
- key: @tls_key,
- certificate: @tls_cert
- }
- )
- else
- consumer = Nsq::Consumer.new(
- :nsqlookupd => @nsqlookupd,
- :topic => @topic,
- :channel => @channel,
- :tls_v1 => @tls_v1,
- :max_in_flight => @max_in_flight
- )
- end
- while true
+ while !stop?
#@logger.info('consuming...')
- event = consumer.pop
+ event = @consumer.pop
#@logger.info('processing:', :event => event)
queue_event(event.body, logstash_queue)
- event.finish
- end
- rescue LogStash::ShutdownSignal
- @logger.info('nsq got shutdown signal')
- end
- @logger.info('Done running nsq input')
- rescue => e
- @logger.warn('client threw exception, restarting',
- :exception => e)
- retry
+ event.finish
+ end
+ @logger.warn('Done running nsq input')
end
- finished
end # def run
private
def queue_event(body, output_queue)
begin
#@logger.info('processing:', :body => body)
- event = LogStash::Event.new("message" => body)
- decorate(event)
- output_queue << event
- rescue => e # parse or event creation error
- @logger.error('Failed to create event', :message => "#{body}", :exception => e,
+ event = LogStash::Event.new("message" => body)
+ decorate(event)
+ output_queue << event
+ rescue => e # parse or event creation error
+ @logger.error('Failed to create event', :message => "#{body}", :exception => e,
:backtrace => e.backtrace)
end # begin
end # def queue_event
+
+ public
+ def stop
+ @logger.warn('nsq got stop signal. terminate')
+ @consumer.terminate
+ end #stop
end #class LogStash::Inputs::Nsq