Sha256: 57891fd6af8538a11900c76c9f135e62e7d68ea34653c038924e86f9f585db58
Contents?: true
Size: 1.33 KB
Versions: 11
Compression:
Stored size: 1.33 KB
Contents
require "logstash/inputs/base" require "amqp" # rubygem 'amqp' require "mq" # rubygem 'amqp' require "uuidtools" # rubygem 'uuidtools' class LogStash::Inputs::Amqp < LogStash::Inputs::Base MQTYPES = [ "fanout", "queue", "topic" ] def initialize(url, type, config={}, &block) super @mq = nil # Handle path /<type>/<name> unused, @mqtype, @name = @url.path.split("/", 3) if @mqtype == nil or @name == nil raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}" end if !MQTYPES.include?(@mqtype) raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.JOIN(", ")}" end end def register @logger.info("Registering input #{@url}") @amqp = AMQP.connect(:host => @url.host) @mq = MQ.new(@amqp) @target = nil @target = @mq.queue(UUIDTools::UUID.timestamp_create) case @mqtype when "fanout" #@target.bind(MQ.fanout(@url.path, :durable => true)) @target.bind(@mq.fanout(@name)) when "direct" @target.bind(@mq.direct(@name)) when "topic" @target.bind(@mq.topic(@name)) end # case @mqtype @target.subscribe(:ack => true) do |header, message| event = LogStash::Event.from_json(message) receive(event) header.ack end end # def register end # class LogStash::Inputs::Amqp
Version data entries
11 entries across 11 versions & 1 rubygems