Sha256: 0b667b6738612386aad2c6f7326874cce4c4a64b315c7f439c343664a354066f

Contents?: true

Size: 1.37 KB

Versions: 11

Compression:

Stored size: 1.37 KB

Contents

require "logstash/outputs/base"
require "amqp" # rubygem 'amqp'
require "mq" # rubygem 'amqp'

class LogStash::Outputs::Amqp < LogStash::Outputs::Base
  MQTYPES = [ "fanout", "queue", "topic" ]
  def initialize(url, config={}, &block)
    super

    # Handle path /<type>/<name>
    unused, @mqtype, @name = @url.path.split("/", 3)
    if @mqtype == nil or @name == nil
      raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
    end

    if !MQTYPES.include?(@mqtype)
      raise "Invalid type '#{@mqtype}' must be one #{MQTYPES.join(", ")}"
    end
  end # def initialize

  def register
    @logger.info("Registering output #{@url}")
    @amqp = AMQP.connect(:host => @url.host)
    @mq = MQ.new(@amqp)
    @target = nil

    case @mqtype
      when "fanout"
        @target = @mq.fanout(@name)
      when "queue"
        @target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
      when "topic"
        @target = @mq.topic(@name)
    end # case @mqtype
  end # def register

  def receive(event)
    @logger.debug(["Sending event", { :url => @url, :event => event }])
    @target.publish(event.to_json)
  end # def receive

  def receive_raw(raw)
    if @target == nil
      raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil"
    end

    @target.publish(raw)
  end # def receive_raw
end # class LogStash::Outputs::Amqp

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
logstash-lite-0.2.20101222161646 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101208111718 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101207114354 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101201111523 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101129210156 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101129205551 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101129155412 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101124030048 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101124004656 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101123134625 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20101123133737 lib/logstash/outputs/amqp.rb