lib/fnordmetric/inbound_stream.rb in fnordmetric-0.6.1 vs lib/fnordmetric/inbound_stream.rb in fnordmetric-0.6.2

- old
+ new

@@ -1,8 +1,6 @@ -require 'securerandom' class FnordMetric::InboundStream < EventMachine::Connection - @@opts = nil def self.start(opts) @@opts = opts EM.start_server(*opts[:inbound_stream], self) @@ -11,22 +9,10 @@ def receive_data(chunk) @buffer << chunk EM.defer{ next_event } end - def push_event(event_id, event_data) - prefix = @@opts[:redis_prefix] - - @redis.hincrby "#{prefix}-stats", "events_received", 1 - @redis.set "#{prefix}-event-#{event_id}", event_data - @redis.lpush "#{prefix}-queue", event_id - @redis.expire "#{prefix}-event-#{event_id}", @@opts[:event_queue_ttl] - - @events_buffered -= 1 - close_connection? - end - def next_event read_next_event push_next_event end @@ -37,31 +23,28 @@ end end def push_next_event return true if @events.empty? - push_event(get_next_uuid, @events.pop) + @events_buffered -= 1 + @api.event(@events.pop) + close_connection? EM.next_tick(&method(:push_next_event)) end - def get_next_uuid - SecureRandom.uuid - end - def close_connection? - @redis.quit unless @streaming || (@events_buffered!=0) + @api.disconnect unless @streaming || (@events_buffered!=0) end def post_init - @redis = Redis.connect(:url => @@opts[:redis_url]) + @api = FnordMetric::API.new(@@opts) @events_buffered = 0 @streaming = true @buffer = "" @events = [] end def unbind @streaming = false close_connection? end - end