Sha256: 2ba68e463dfa271f6310308eaa990fb812c23f46cf6f992365d8473f8ff0a101

Contents?: true

Size: 1.32 KB

Versions: 5

Compression:

Stored size: 1.32 KB

Contents

require "logstash/inputs/base"
require "amqp" # rubygem 'amqp'
require "mq" # rubygem 'amqp'
require "uuidtools" # rubygem 'uuidtools'

class LogStash::Inputs::Amqp < LogStash::Inputs::Base
  MQTYPES = [ "fanout", "queue", "topic" ]

  def initialize(url, type, config={}, &block)
    super

    @mq = nil

    # Handle path /<type>/<name>
    unused, @mqtype, @name = @url.path.split("/", 3)
    if @mqtype == nil or @name == nil
      raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
    end

    if !MQTYPES.include?(@mqtype)
      raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.JOIN(", ")}"
    end
  end

  def register
    @logger.info("Registering #{@url}")
    @amqp = AMQP.connect(:host => @url.host)
    @mq = MQ.new(@amqp)
    @target = nil

    @target = @mq.queue(UUIDTools::UUID.timestamp_create)
    case @mqtype
      when "fanout"
        #@target.bind(MQ.fanout(@url.path, :durable => true))
        @target.bind(@mq.fanout(@name))
      when "direct"
        @target.bind(@mq.direct(@name))
      when "topic"
        @target.bind(@mq.topic(@name))
    end # case @mqtype

    @target.subscribe(:ack => true) do |header, message|
      event = LogStash::Event.from_json(message)
      receive(event)
      header.ack
    end
  end # def register
end # class LogStash::Inputs::Amqp

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
logstash-lite-0.2.20101120024757 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101120021802 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101119183130 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101118141920 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101118134500 lib/logstash/inputs/amqp.rb