Sha256: 6dee272c7b9c0f018a85fd8f447cde7bc1a56d6ef67f59ad36c9926325fefbb4

Contents?: true

Size: 1.67 KB

Versions: 3

Compression:

Stored size: 1.67 KB

Contents

require "amqp" # rubygem 'amqp'
require "logstash/outputs/base"
require "logstash/namespace"
require "mq" # rubygem 'amqp'

class LogStash::Outputs::Amqp < LogStash::Outputs::Base
  MQTYPES = [ "fanout", "queue", "topic" ]

  public
  def initialize(url, config={}, &block)
    super

    # Handle path /<type>/<name>
    unused, @mqtype, @name = @url.path.split("/", 3)
    if @mqtype == nil or @name == nil
      raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
    end

    if !MQTYPES.include?(@mqtype)
      raise "Invalid type '#{@mqtype}' must be one #{MQTYPES.join(", ")}"
    end
  end # def initialize

  public
  def register
    @logger.info("Registering output #{@url}")
    amqpsettings = {
      :host => @url.host,
      :port => (@url.port or 5672),
    }
    amqpsettings[:user] = @url.user if @url.user
    amqpsettings[:pass] = @url.password if @url.password
    @amqp = AMQP.connect(amqpsettings)
    @mq = MQ.new(@amqp)
    @target = nil

    case @mqtype
      when "fanout"
        @target = @mq.fanout(@name)
      when "queue"
        @target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
      when "topic"
        @target = @mq.topic(@name)
    end # case @mqtype
  end # def register

  public
  def receive(event)
    @logger.debug(["Sending event", { :url => @url, :event => event }])
    @target.publish(event.to_json)
  end # def receive

  # This is used by the ElasticSearch AMQP/River output.
  public
  def receive_raw(raw)
    if @target == nil
      raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil"
    end

    @target.publish(raw)
  end # def receive_raw
end # class LogStash::Outputs::Amqp

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-lite-0.2.20110206003603 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20110203130400 lib/logstash/outputs/amqp.rb
logstash-lite-0.2.20110122143801 lib/logstash/outputs/amqp.rb