lib/fnordmetric/inbound_stream.rb in fnordmetric-0.6.1 vs lib/fnordmetric/inbound_stream.rb in fnordmetric-0.6.2
- old
+ new
@@ -1,8 +1,6 @@
-require 'securerandom'
class FnordMetric::InboundStream < EventMachine::Connection
-
@@opts = nil
def self.start(opts)
@@opts = opts
EM.start_server(*opts[:inbound_stream], self)
@@ -11,22 +9,10 @@
def receive_data(chunk)
@buffer << chunk
EM.defer{ next_event }
end
- def push_event(event_id, event_data)
- prefix = @@opts[:redis_prefix]
-
- @redis.hincrby "#{prefix}-stats", "events_received", 1
- @redis.set "#{prefix}-event-#{event_id}", event_data
- @redis.lpush "#{prefix}-queue", event_id
- @redis.expire "#{prefix}-event-#{event_id}", @@opts[:event_queue_ttl]
-
- @events_buffered -= 1
- close_connection?
- end
-
def next_event
read_next_event
push_next_event
end
@@ -37,31 +23,28 @@
end
end
def push_next_event
return true if @events.empty?
- push_event(get_next_uuid, @events.pop)
+ @events_buffered -= 1
+ @api.event(@events.pop)
+ close_connection?
EM.next_tick(&method(:push_next_event))
end
- def get_next_uuid
- SecureRandom.uuid
- end
-
def close_connection?
- @redis.quit unless @streaming || (@events_buffered!=0)
+ @api.disconnect unless @streaming || (@events_buffered!=0)
end
def post_init
- @redis = Redis.connect(:url => @@opts[:redis_url])
+ @api = FnordMetric::API.new(@@opts)
@events_buffered = 0
@streaming = true
@buffer = ""
@events = []
end
def unbind
@streaming = false
close_connection?
end
-
end