Sha256: be93e19c90e90ffe4fe4e7d6926f2c68734bd9c6081ef97d5dd2c8b7abc9b0df

Contents?: true

Size: 1.34 KB

Versions: 9

Compression:

Stored size: 1.34 KB

Contents

class FnordMetric::STOMPAcceptor

  def self.start(opts)
    begin
      require "stomp"
    rescue LoadError
      FnordMetric.error("require 'stomp' failed, you need the stomp gem")
      exit 1
    end

    new(opts)
  end

  def initialize(opts)
    @mutex = Mutex.new

    client = Stomp::Client.new(:hosts => [{
      :host => opts[:host],
      :port => opts[:port],
      :passcode => opts[:password],
      :login => opts[:username]}])

    msg_handler = lambda do |topic, msg|
      data = msg.body

      event = begin
        JSON.parse(data)
      rescue
        FnordMetric.log("[STOMP] received invalid JSON: #{data[0..60]}")
      end

      if event
        event["_type"] ||= topic.gsub(/^\/topic\//, '')
        @mutex.synchronize{ events << event }
      end
    end

    opts[:topics].each do |topic|
      client.subscribe(topic){ |data| msg_handler[topic, data] }
    end

    Thread.new do
      client.join
    end

    EM.next_tick(&method(:push_next_event))
  end

  def push_next_event
    nxt = @mutex.synchronize{ events.pop }
    unless nxt
      EM::Timer.new(0.01, &method(:push_next_event))
      return true
    end
    api.event(nxt)
    EM.next_tick(&method(:push_next_event))
  end

  def events
    @events ||= []
  end

  def api
    @api ||= FnordMetric::API.new(FnordMetric.options)
  end

  def self.outbound?
    true
  end

end

Version data entries

9 entries across 9 versions & 3 rubygems

Version Path
johnf-fnordmetric-1.2.10 lib/fnordmetric/acceptors/stomp_acceptor.rb
fnordmetric-1.2.9 lib/fnordmetric/acceptors/stomp_acceptor.rb
johnf-fnordmetric-1.2.7 lib/fnordmetric/acceptors/stomp_acceptor.rb
bp-fnordmetric-1.2.7 lib/fnordmetric/acceptors/stomp_acceptor.rb
fnordmetric-1.2.7 lib/fnordmetric/acceptors/stomp_acceptor.rb
fnordmetric-1.2.6 lib/fnordmetric/acceptors/stomp_acceptor.rb
fnordmetric-1.2.4 lib/fnordmetric/acceptors/stomp_acceptor.rb
fnordmetric-1.2.1 lib/fnordmetric/acceptors/stomp_acceptor.rb
fnordmetric-1.2.0 lib/fnordmetric/acceptors/stomp_acceptor.rb