Sha256: 756d0c9058d2c93ff86c717ca37c3fd05950b966b81dd651f4a92e48440d9e4c

Contents?: true

Size: 1.07 KB

Versions: 9

Compression:

Stored size: 1.07 KB

Contents

class FnordMetric::AMQPAcceptor

  def self.start(opts)
    begin
      require "amqp"
    rescue LoadError
      FnordMetric.error("require 'amqp' failed, you need the amqp gem")
      exit 1
    end

    new(opts)
  end

  def initialize(opts)
    amqp = AMQP.connect(:host => 'firehose')
    amqp_channel = AMQP::Channel.new(amqp)

    msg_handler = lambda do |channel, data|
      event = begin
        JSON.parse(data)
      rescue
        FnordMetric.log("[AMQP] received invalid JSON: #{data[0..60]}")
      end

      if event
        event["_type"] ||= channel
        events << event
        push_next_event
      end
    end

    opts[:channels].each do |channel|
      queue = amqp_channel.queue(channel, :auto_delete => true)
      queue.subscribe{ |data| msg_handler[channel, data] }
    end
  end

  def push_next_event
    return true if events.empty?
    api.event(@events.pop)
    EM.next_tick(&method(:push_next_event))
  end

  def events
    @events ||= []
  end

  def api
    @api ||= FnordMetric::API.new(FnordMetric.options)
  end

  def self.outbound?
    true
  end

end

Version data entries

9 entries across 9 versions & 3 rubygems

Version Path
johnf-fnordmetric-1.2.10 lib/fnordmetric/acceptors/amqp_acceptor.rb
fnordmetric-1.2.9 lib/fnordmetric/acceptors/amqp_acceptor.rb
johnf-fnordmetric-1.2.7 lib/fnordmetric/acceptors/amqp_acceptor.rb
bp-fnordmetric-1.2.7 lib/fnordmetric/acceptors/amqp_acceptor.rb
fnordmetric-1.2.7 lib/fnordmetric/acceptors/amqp_acceptor.rb
fnordmetric-1.2.6 lib/fnordmetric/acceptors/amqp_acceptor.rb
fnordmetric-1.2.4 lib/fnordmetric/acceptors/amqp_acceptor.rb
fnordmetric-1.2.1 lib/fnordmetric/acceptors/amqp_acceptor.rb
fnordmetric-1.2.0 lib/fnordmetric/acceptors/amqp_acceptor.rb