lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.4 vs lib/logstash/inputs/nsq.rb in logstash-input-nsq-1.0.5
- old
+ new
@@ -9,27 +9,44 @@
config :nsqlookupd, :validate => :array, :default => 'localhost:4161'
config :channel, :validate => :string, :default => 'logstash'
config :topic, :validate => :string, :default => 'testtopic'
config :max_in_flight, :validate => :number, :default => 100
+ config :tls_v1, :validate => :boolean, :default => false
+ config :tls_key, :validate => :string
+ config :tls_cert, :validate => :string
-
public
def register
@logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
end # def register
public
def run(logstash_queue)
@logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
begin
begin
- consumer = Nsq::Consumer.new(
+ if @tls_key and @tls_cert
+ consumer = Nsq::Consumer.new(
+ :nsqlookupd => @nsqlookupd,
+ :topic => @topic,
+ :channel => @channel,
+ :max_in_flight => @max_in_flight,
+ :tls_v1 => @tls_v1,
+ :tls_context => {
+ key: @ssl_key,
+ certificate: @ssl_cert
+ }
+ )
+ else
+ consumer = Nsq::Consumer.new(
:nsqlookupd => @nsqlookupd,
:topic => @topic,
:channel => @channel,
+ :tls_v1 => @tls_v1,
:max_in_flight => @max_in_flight
- )
+ )
+ end
while true
#@logger.info('consuming...')
event = consumer.pop
#@logger.info('processing:', :event => event)
queue_event(event.body, logstash_queue)