lib/fnordmetric/worker.rb in fnordmetric-0.7.5 vs lib/fnordmetric/worker.rb in fnordmetric-0.9.7
- old
+ new
@@ -1,46 +1,41 @@
class FnordMetric::Worker
- def initialize(namespaces, opts)
- @namespaces = {}
- @opts = opts
- configure(namespaces)
- end
+ def initialize
+ @namespaces = FnordMetric.namespaces
+ @opts = FnordMetric.options
- def ready!
- @redis = EM::Hiredis.connect(@opts[:redis_url])
- tick
+ FnordMetric.register(self)
end
- def configure(namespaces)
- namespaces.each do |key, block|
- @namespaces[key] = FnordMetric::Namespace.new(key, @opts.clone)
- @namespaces[key].instance_eval(&block)
- end
+ def initialized
+ FnordMetric.log("worker started")
+ EM.next_tick(&method(:tick))
end
def tick
- @redis.blpop(queue_key, 1).callback do |list, event_id|
+ redis.blpop(queue_key, 1).callback do |list, event_id|
EM.next_tick(&method(:tick))
if event_id
- @redis.get(event_key(event_id)).callback do |event_data|
- process_event(event_id, event_data) if event_data
+ redis.get(event_key(event_id)).callback do |event_data|
+ process_event(event_id, event_data) if event_data
FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data
- @redis.hincrby(stats_key, :events_processed, 1)
+ redis.hincrby(stats_key, :events_processed, 1)
end
end
end
end
def process_event(event_id, event_data)
- EM.defer do
- parse_json(event_data).tap do |event|
+ EM.next_tick do
+ event = parse_json(event_data)
+ if event
event[:_time] ||= Time.now.to_i
event[:_eid] = event_id
announce_event(event)
- publish_event(event)
- expire_event(event_id)
+ publish_event(event)
+ expire_event(event_id)
end
end
end
def pubsub_key
@@ -57,28 +52,34 @@
def stats_key
[@opts[:redis_prefix], 'stats'].join("-")
end
- def announce_event(event)
- namespace(event[:_namespace]).ready!(@redis).announce(event)
+ def announce_event(event)
+ namespace(event[:_namespace]).ready!(redis).announce(event)
end
def expire_event(event_id)
- @redis.expire(event_key(event_id), @opts[:event_data_ttl])
+ redis.expire(event_key(event_id), @opts[:event_data_ttl])
end
def publish_event(event)
- @redis.publish(pubsub_key, event[:_eid])
+ redis.publish(pubsub_key, event.to_json)
end
def namespace(key)
(@namespaces[key] || @namespaces.first.last).clone
end
def parse_json(data)
event = Yajl::Parser.new(:symbolize_keys => true).parse(data)
event[:_namespace] = event[:_namespace].to_sym if event[:_namespace]
event
+ rescue Yajl::ParseError => e
+ FnordMetric.error "invalid json: #{e.to_s}"; false
+ end
+
+ def redis
+ @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL
end
end