Sha256: 09803cf5bb8722b101163fae36fab15a59e155eaa8322ee201050a3765c27832
Contents?: true
Size: 1.42 KB
Versions: 2
Compression:
Stored size: 1.42 KB
Contents
require 'securerandom' class FnordMetric::InboundStream < EventMachine::Connection @@opts = nil def self.start(opts) @@opts = opts EM.start_server(*opts[:inbound_stream], self) end def receive_data(chunk) @buffer << chunk EM.defer{ next_event } end def push_event(event_id, event_data) prefix = @@opts[:redis_prefix] @redis.hincrby "#{prefix}-stats", "events_received", 1 @redis.set "#{prefix}-event-#{event_id}", event_data @redis.lpush "#{prefix}-queue", event_id @redis.expire "#{prefix}-event-#{event_id}", @@opts[:event_queue_ttl] @events_buffered -= 1 close_connection? end def next_event read_next_event push_next_event end def read_next_event while (event = @buffer.slice!(/^(.*)\n/)) @events_buffered += 1 @events << event end end def push_next_event return true if @events.empty? push_event(get_next_uuid, @events.pop) EM.next_tick(&method(:push_next_event)) end def get_next_uuid SecureRandom.uuid end def close_connection? @redis.quit unless @streaming || (@events_buffered!=0) end def post_init @redis = Redis.connect(:url => @@opts[:redis_url]) @events_buffered = 0 @streaming = true @buffer = "" @events = [] end def unbind @streaming = false close_connection? end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fnordmetric-0.6.1 | lib/fnordmetric/inbound_stream.rb |
fnordmetric-0.6.0 | lib/fnordmetric/inbound_stream.rb |