lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.4 vs lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.5

- old
+ new

@@ -9,27 +9,44 @@ config :nsqlookupd, :validate => :array, :default => 'localhost:4161' config :channel, :validate => :string, :default => 'logstash' config :topic, :validate => :string, :default => 'testtopic' config :max_in_flight, :validate => :number, :default => 100 + config :tls_v1, :validate => :boolean, :default => false + config :tls_key, :validate => :string + config :tls_cert, :validate => :string - public def register @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) end # def register public def run(logstash_queue) @logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd) begin begin - consumer = Nsq::Consumer.new( + if @tls_key and @tls_cert + consumer = Nsq::Consumer.new( + :nsqlookupd => @nsqlookupd, + :topic => @topic, + :channel => @channel, + :max_in_flight => @max_in_flight, + :tls_v1 => @tls_v1, + :tls_context => { + key: @ssl_key, + certificate: @ssl_cert + } + ) + else + consumer = Nsq::Consumer.new( :nsqlookupd => @nsqlookupd, :topic => @topic, :channel => @channel, + :tls_v1 => @tls_v1, :max_in_flight => @max_in_flight - ) + ) + end while true #@logger.info('consuming...') event = consumer.pop #@logger.info('processing:', :event => event) queue_event(event.body, logstash_queue)