# encoding: utf-8 require "logstash/inputs/threadable" require "logstash/namespace" require "logstash/inputs/rabbitmq/bunny" # Pull events from a RabbitMQ exchange. # # The default settings will create an entirely transient queue and listen for all messages by default. # If you need durability or any other advanced settings, please set the appropriate options # # This has been tested with Bunny 1.7.x, which supports RabbitMQ 2.x and 3.x. You can # find links to both here: # # * Bunny - class LogStash::Inputs::Bunny < LogStash::Inputs::Threadable config_name "bunny" # # Connection # # RabbitMQ server address config :host, :validate => :string, :required => true # RabbitMQ port to connect on config :port, :validate => :number, :default => 5672 # RabbitMQ username config :user, :validate => :string, :default => "guest" # RabbitMQ password config :password, :validate => :password, :default => "guest" # The vhost to use. If you don't know what this is, leave the default. config :vhost, :validate => :string, :default => "/" # Enable or disable SSL config :ssl, :validate => :boolean, :default => false config :ssl_cert, :validate => :string config :ssl_key, :validate => :string config :ssl_ca_cert, :validate => :string # Validate SSL certificate config :verify_ssl, :validate => :boolean, :default => false config :verify_peer, :validate => :boolean, :default => false # Enable or disable logging config :debug, :validate => :boolean, :default => false, :deprecated => "Use the logstash --debug flag for this instead." # # Queue & Consumer # # The name of the queue Logstash will consume events from. config :queue, :validate => :string, :default => "" # Is this queue durable? (aka; Should it survive a broker restart?) config :durable, :validate => :boolean, :default => false # Should the queue be deleted on the broker when the last consumer # disconnects? Set this option to `false` if you want the queue to remain # on the broker, queueing up messages until a consumer comes along to # consume them. config :auto_delete, :validate => :boolean, :default => false # Is the queue exclusive? Exclusive queues can only be used by the connection # that declared them and will be deleted when it is closed (e.g. due to a Logstash # restart). config :exclusive, :validate => :boolean, :default => false # Extra queue arguments as an array. # To make a RabbitMQ queue mirrored, use: `{"x-ha-policy" => "all"}` config :arguments, :validate => :array, :default => {} # Prefetch count. Number of messages to prefetch config :prefetch_count, :validate => :number, :default => 256 # Enable message acknowledgement config :ack, :validate => :boolean, :default => true # Passive queue creation? Useful for checking queue existance without modifying server state config :passive, :validate => :boolean, :default => false # # (Optional) Exchange binding # # Optional. # # The name of the exchange to bind the queue to. config :exchange, :validate => :string # Optional. # # The routing key to use when binding a queue to the exchange. # This is only relevant for direct or topic exchanges. # # * Routing keys are ignored on fanout exchanges. # * Wildcards are not valid on direct exchanges. config :key, :validate => :string, :default => "logstash" def initialize(params) params["codec"] = "json" if !params["codec"] super end include ::LogStash::Inputs::RabbitMQ::BunnyImpl def run(output_queue) @output_queue = output_queue begin setup consume rescue Bunny::NetworkFailure, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::TCPConnectionFailed => e n = Bunny::Session::DEFAULT_NETWORK_RECOVERY_INTERVAL * 2 # Because we manually reconnect instead of letting Bunny # handle failures, # make sure we don't leave any consumer work pool # threads behind. MK. @ch.maybe_kill_consumer_work_pool! @logger.error("RabbitMQ[#{input_name}] connection error: #{e.message}. Will attempt to reconnect in #{n} seconds...") sleep n retry rescue OpenSSL::SSL::SSLError => e n = Bunny::Session::DEFAULT_NETWORK_RECOVERY_INTERVAL * 2 @logger.error("RabbitMQ[#{input_name}] SSL connection error: #{e.message}. Will attempt to reconnect in #{n} seconds...") sleep n retry rescue LogStash::ShutdownSignal # ignore and quit rescue Exception => e @logger.error("RabbitMQ[#{input_name}] unhandled exception: #{e.inspect}") @logger.error(e.backtrace) end end def register super if @ssl and @ssl_cert @settings[:tls_cert] = @ssl_cert @settings[:tls_key] = @ssl_key @settings[:tls_ca_certificates] = [@ssl_ca_cert] if @ssl_ca_cert @settings[:verify_peer] = @verify_peer end end def teardown @consumer.cancel if @consumer @ch.close if @ch && @ch.open? @conn.close if @conn && @conn.open? finished end protected def consumer_id @consumer_id ||= "#{`hostname -s`.strip}-#{Thread.current.object_id}" end def input_name @exchange.nil? or @key.nil? ? queue : "#{@exchange}:#{@key}" end def setup return if terminating? begin attempts ||= 3 @logger.debug("RabbitMQ[#{input_name}] connecting.") @conn = Bunny.new(@settings) @conn.start rescue OpenSSL::SSL::SSLError @logger.warn("SSL Handshake failed, retrying.") attempts -= 1 retry unless attempts == 0 end @logger.info("RabbitMQ[#{input_name}] connected at #{@settings[:host]}") @ch = @conn.create_channel.tap do |ch| ch.prefetch(@prefetch_count) end @arguments_hash = Hash[*@arguments] @q = @ch.queue(@queue, :durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :passive => @passive, :arguments => @arguments) # exchange binding is optional for the input if @exchange @q.bind(@exchange, :routing_key => @key) end end def consume @logger.info("Will consume events from queue #{@q.name} as #{consumer_id}") no_ack = !@ack exclusive = @exclusive @consumer = Bunny::Consumer.new(@ch, @q, consumer_id, no_ack, exclusive) @consumer.on_delivery do |delivery_info, properties, data| @codec.decode(data) do |event| decorate(event) @output_queue << event end @ch.acknowledge(delivery_info.delivery_tag) if @ack end @q.subscribe_with(@consumer, block: true) end end # class LogStash::Inputs::RabbitMQ