Sha256: 610e277c5cfce36f3b1589f3a822cf6a055cdbe30627d038766c9eeea2916c3f

Contents?: true

Size: 759 Bytes

Versions: 9

Compression:

Stored size: 759 Bytes

Contents

class FnordMetric::FyrehoseAcceptor

  def self.start(opts)
    require "fyrehose"
    require "fyrehose/reactor"

    new(opts)
  end

  def initialize(opts)
    reactor = EM.connect(opts[:host], opts[:port], Fyrehose::Reactor)

    reactor.on_message do |channel, data|
      event = JSON.parse(data)
      event["_type"] ||= channel
      events << event
      push_next_event
    end

    opts[:channels].each do |channel|
      reactor.subscribe(channel)
    end
  end

  def push_next_event
    return true if events.empty?
    api.event(@events.pop)
    EM.next_tick(&method(:push_next_event))
  end

  def events
    @events ||= []
  end

  def api
    @api ||= FnordMetric::API.new(FnordMetric.options)
  end

  def self.outboud?
    true
  end

end

Version data entries

9 entries across 9 versions & 3 rubygems

Version Path
johnf-fnordmetric-1.2.10 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
fnordmetric-1.2.9 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
johnf-fnordmetric-1.2.7 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
bp-fnordmetric-1.2.7 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
fnordmetric-1.2.7 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
fnordmetric-1.2.6 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
fnordmetric-1.2.4 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
fnordmetric-1.2.1 lib/fnordmetric/acceptors/fyrehose_acceptor.rb
fnordmetric-1.2.0 lib/fnordmetric/acceptors/fyrehose_acceptor.rb