# encoding: utf-8 require "logstash/inputs/threadable" require "logstash/namespace" require "logstash/inputs/rabbitmq/bunny" # Pull events from a RabbitMQ exchange. # # The default settings will create an entirely transient queue and listen for all messages by default. # If you need durability or any other advanced settings, please set the appropriate options # # This has been tested with Bunny 1.7.x, which supports RabbitMQ 2.x and 3.x. You can # find links to both here: # # * Bunny - class LogStash::Inputs::Bunny < LogStash::Inputs::Threadable config_name "bunny" # # Connection # # RabbitMQ server address config :host, :validate => :string, :required => true # RabbitMQ port to connect on config :port, :validate => :number, :default => 5672 # RabbitMQ username config :user, :validate => :string, :default => "guest" # RabbitMQ password config :password, :validate => :password, :default => "guest" # The vhost to use. If you don't know what this is, leave the default. config :vhost, :validate => :string, :default => "/" # RabbitMQ hearbeats config :heartbeat, :validate => [:number, :string], :default => 0 # Enable or disable SSL config :ssl, :validate => :boolean, :default => false config :ssl_cert, :validate => :string config :ssl_key, :validate => :string config :ssl_ca_cert, :validate => :string # Validate SSL certificate config :verify_ssl, :validate => :boolean, :default => false config :verify_peer, :validate => :boolean, :default => false # Enable or disable logging config :debug, :validate => :boolean, :default => false, :deprecated => "Use the logstash --debug flag for this instead." # # Queue & Consumer # # The name of the queue Logstash will consume events from. config :queue, :validate => :string, :default => "" # Is this queue durable? (aka; Should it survive a broker restart?) config :durable, :validate => :boolean, :default => false # Should the queue be deleted on the broker when the last consumer # disconnects? Set this option to `false` if you want the queue to remain # on the broker, queueing up messages until a consumer comes along to # consume them. config :auto_delete, :validate => :boolean, :default => false # Is the queue exclusive? Exclusive queues can only be used by the connection # that declared them and will be deleted when it is closed (e.g. due to a Logstash # restart). config :exclusive, :validate => :boolean, :default => false # Extra queue arguments as an array. # To make a RabbitMQ queue mirrored, use: `{"x-ha-policy" => "all"}` config :arguments, :validate => :array, :default => {} # Prefetch count. Number of messages to prefetch config :prefetch_count, :validate => :number, :default => 256 # Enable message acknowledgement config :ack, :validate => :boolean, :default => true # Passive queue creation? Useful for checking queue existance without modifying server state config :passive, :validate => :boolean, :default => false # # (Optional) Exchange binding # # Optional. # # The name of the exchange to bind the queue to. config :exchange, :validate => :string # Optional. # # The routing key to use when binding a queue to the exchange. # This is only relevant for direct or topic exchanges. # # * Routing keys are ignored on fanout exchanges. # * Wildcards are not valid on direct exchanges. config :key, :validate => :string, :default => "logstash" def initialize(params) params["codec"] = "json" if !params["codec"] super end include ::LogStash::Inputs::RabbitMQ::BunnyImpl def register super @heartbeat = :server if @heartbeat == "server" @settings[:heartbeat] = @heartbeat if @ssl and @ssl_cert @settings[:tls_cert] = @ssl_cert @settings[:tls_key] = @ssl_key @settings[:tls_ca_certificates] = [@ssl_ca_cert] if @ssl_ca_cert @settings[:verify_peer] = @verify_peer end end end # class LogStash::Inputs::RabbitMQ