Sha256: 57891fd6af8538a11900c76c9f135e62e7d68ea34653c038924e86f9f585db58

Contents?: true

Size: 1.33 KB

Versions: 11

Compression:

Stored size: 1.33 KB

Contents

require "logstash/inputs/base"
require "amqp" # rubygem 'amqp'
require "mq" # rubygem 'amqp'
require "uuidtools" # rubygem 'uuidtools'

class LogStash::Inputs::Amqp < LogStash::Inputs::Base
  MQTYPES = [ "fanout", "queue", "topic" ]

  def initialize(url, type, config={}, &block)
    super

    @mq = nil

    # Handle path /<type>/<name>
    unused, @mqtype, @name = @url.path.split("/", 3)
    if @mqtype == nil or @name == nil
      raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
    end

    if !MQTYPES.include?(@mqtype)
      raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.JOIN(", ")}"
    end
  end

  def register
    @logger.info("Registering input #{@url}")
    @amqp = AMQP.connect(:host => @url.host)
    @mq = MQ.new(@amqp)
    @target = nil

    @target = @mq.queue(UUIDTools::UUID.timestamp_create)
    case @mqtype
      when "fanout"
        #@target.bind(MQ.fanout(@url.path, :durable => true))
        @target.bind(@mq.fanout(@name))
      when "direct"
        @target.bind(@mq.direct(@name))
      when "topic"
        @target.bind(@mq.topic(@name))
    end # case @mqtype

    @target.subscribe(:ack => true) do |header, message|
      event = LogStash::Event.from_json(message)
      receive(event)
      header.ack
    end
  end # def register
end # class LogStash::Inputs::Amqp

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
logstash-lite-0.2.20101222161646 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101208111718 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101207114354 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101201111523 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101129210156 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101129205551 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101129155412 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101124030048 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101124004656 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101123134625 lib/logstash/inputs/amqp.rb
logstash-lite-0.2.20101123133737 lib/logstash/inputs/amqp.rb