Sha256: 0a09760ceb6aa80d432c1ba9b09592adb15155f565a8095998bf577a49402c02
Contents?: true
Size: 1.57 KB
Versions: 3
Compression:
Stored size: 1.57 KB
Contents
require "amqp" # rubygem 'amqp' require "logstash/inputs/base" require "logstash/namespace" require "mq" # rubygem 'amqp' require "uuidtools" # rubygem 'uuidtools' class LogStash::Inputs::Amqp < LogStash::Inputs::Base MQTYPES = [ "fanout", "queue", "topic" ] public def initialize(url, type, config={}, &block) super @mq = nil # Handle path /<type>/<name> unused, @mqtype, @name = @url.path.split("/", 3) if @mqtype == nil or @name == nil raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}" end if !MQTYPES.include?(@mqtype) raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.JOIN(", ")}" end end # def initialize public def register @logger.info("Registering input #{@url}") amqpsettings = { :host => @url.host, :port => (@url.port or 5672), } amqpsettings[:user] = @url.user if @url.user amqpsettings[:pass] = @url.password if @url.password @amqp = AMQP.connect(amqpsettings) @mq = MQ.new(@amqp) @target = nil @target = @mq.queue(UUIDTools::UUID.timestamp_create) case @mqtype when "fanout" #@target.bind(MQ.fanout(@url.path, :durable => true)) @target.bind(@mq.fanout(@name)) when "direct" @target.bind(@mq.direct(@name)) when "topic" @target.bind(@mq.topic(@name)) end # case @mqtype @target.subscribe(:ack => true) do |header, message| event = LogStash::Event.from_json(message) receive(event) header.ack end end # def register end # class LogStash::Inputs::Amqp
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
logstash-lite-0.2.20110206003603 | lib/logstash/inputs/amqp.rb |
logstash-lite-0.2.20110203130400 | lib/logstash/inputs/amqp.rb |
logstash-lite-0.2.20110122143801 | lib/logstash/inputs/amqp.rb |