Sha256: 6dee272c7b9c0f018a85fd8f447cde7bc1a56d6ef67f59ad36c9926325fefbb4
Contents?: true
Size: 1.67 KB
Versions: 3
Compression:
Stored size: 1.67 KB
Contents
require "amqp" # rubygem 'amqp' require "logstash/outputs/base" require "logstash/namespace" require "mq" # rubygem 'amqp' class LogStash::Outputs::Amqp < LogStash::Outputs::Base MQTYPES = [ "fanout", "queue", "topic" ] public def initialize(url, config={}, &block) super # Handle path /<type>/<name> unused, @mqtype, @name = @url.path.split("/", 3) if @mqtype == nil or @name == nil raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}" end if !MQTYPES.include?(@mqtype) raise "Invalid type '#{@mqtype}' must be one #{MQTYPES.join(", ")}" end end # def initialize public def register @logger.info("Registering output #{@url}") amqpsettings = { :host => @url.host, :port => (@url.port or 5672), } amqpsettings[:user] = @url.user if @url.user amqpsettings[:pass] = @url.password if @url.password @amqp = AMQP.connect(amqpsettings) @mq = MQ.new(@amqp) @target = nil case @mqtype when "fanout" @target = @mq.fanout(@name) when "queue" @target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false) when "topic" @target = @mq.topic(@name) end # case @mqtype end # def register public def receive(event) @logger.debug(["Sending event", { :url => @url, :event => event }]) @target.publish(event.to_json) end # def receive # This is used by the ElasticSearch AMQP/River output. public def receive_raw(raw) if @target == nil raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil" end @target.publish(raw) end # def receive_raw end # class LogStash::Outputs::Amqp
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
logstash-lite-0.2.20110206003603 | lib/logstash/outputs/amqp.rb |
logstash-lite-0.2.20110203130400 | lib/logstash/outputs/amqp.rb |
logstash-lite-0.2.20110122143801 | lib/logstash/outputs/amqp.rb |