lib/fnordmetric/worker.rb in fnordmetric-0.7.5 vs lib/fnordmetric/worker.rb in fnordmetric-0.9.7

- old
+ new

@@ -1,46 +1,41 @@ class FnordMetric::Worker - def initialize(namespaces, opts) - @namespaces = {} - @opts = opts - configure(namespaces) - end + def initialize + @namespaces = FnordMetric.namespaces + @opts = FnordMetric.options - def ready! - @redis = EM::Hiredis.connect(@opts[:redis_url]) - tick + FnordMetric.register(self) end - def configure(namespaces) - namespaces.each do |key, block| - @namespaces[key] = FnordMetric::Namespace.new(key, @opts.clone) - @namespaces[key].instance_eval(&block) - end + def initialized + FnordMetric.log("worker started") + EM.next_tick(&method(:tick)) end def tick - @redis.blpop(queue_key, 1).callback do |list, event_id| + redis.blpop(queue_key, 1).callback do |list, event_id| EM.next_tick(&method(:tick)) if event_id - @redis.get(event_key(event_id)).callback do |event_data| - process_event(event_id, event_data) if event_data + redis.get(event_key(event_id)).callback do |event_data| + process_event(event_id, event_data) if event_data FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data - @redis.hincrby(stats_key, :events_processed, 1) + redis.hincrby(stats_key, :events_processed, 1) end end end end def process_event(event_id, event_data) - EM.defer do - parse_json(event_data).tap do |event| + EM.next_tick do + event = parse_json(event_data) + if event event[:_time] ||= Time.now.to_i event[:_eid] = event_id announce_event(event) - publish_event(event) - expire_event(event_id) + publish_event(event) + expire_event(event_id) end end end def pubsub_key @@ -57,28 +52,34 @@ def stats_key [@opts[:redis_prefix], 'stats'].join("-") end - def announce_event(event) - namespace(event[:_namespace]).ready!(@redis).announce(event) + def announce_event(event) + namespace(event[:_namespace]).ready!(redis).announce(event) end def expire_event(event_id) - @redis.expire(event_key(event_id), @opts[:event_data_ttl]) + redis.expire(event_key(event_id), @opts[:event_data_ttl]) end def publish_event(event) - @redis.publish(pubsub_key, event[:_eid]) + redis.publish(pubsub_key, event.to_json) end def namespace(key) (@namespaces[key] || @namespaces.first.last).clone end def parse_json(data) event = Yajl::Parser.new(:symbolize_keys => true).parse(data) event[:_namespace] = event[:_namespace].to_sym if event[:_namespace] event + rescue Yajl::ParseError => e + FnordMetric.error "invalid json: #{e.to_s}"; false + end + + def redis + @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL end end