Sha256: 8586f0054122dedd2862cd9071fc00036c7b6cef915d3f043999b22699f84802

Contents?: true

Size: 1.89 KB

Versions: 6

Compression:

Stored size: 1.89 KB

Contents

class FnordMetric::Worker

  def initialize(namespaces, opts)        
    @namespaces = {}
    @opts = opts
    configure(namespaces)
  end

  def ready!
    @redis = EM::Hiredis.connect(@opts[:redis_uri]) 
    tick
  end

  def configure(namespaces)   
    namespaces.each do |key, block|
      @namespaces[key] = FnordMetric::Namespace.new(key, @opts.clone)
      @namespaces[key].instance_eval(&block)
    end
  end

  def tick
    @redis.blpop(queue_key, 0).callback do |list, event_id|           
      @redis.get(event_key(event_id)).callback do |event_data|                     
        process_event(event_id, event_data) if event_data        
        FnordMetric.log("oops, lost an event :(") unless event_data
        EM.next_tick(&method(:tick))      
        @redis.hincrby(stats_key, :events_processed, 1)
      end
    end
  end

  def process_event(event_id, event_data)
    EM.defer do      
      parse_json(event_data).tap do |event|                
        event[:_time] ||= Time.now.to_i
        event[:_eid] = event_id
        announce_event(event)
        publish_event(event)        
        expire_event(event_id)       
      end
    end
  end

  def pubsub_key
    [@opts[:redis_prefix], 'announce'].join("-")
  end

  def queue_key
    [@opts[:redis_prefix], 'queue'].join("-")
  end

  def event_key(event_id)
    [@opts[:redis_prefix], 'event', event_id].join("-")
  end

  def stats_key
    [@opts[:redis_prefix], 'stats'].join("-")
  end

  def announce_event(event)   
    namespace(event[:_namespace]).ready!(@redis).announce(event)
  end

  def expire_event(event_id)
    @redis.expire(event_key(event_id), @opts[:event_data_ttl])
  end

  def publish_event(event)    
    @redis.publish(pubsub_key, event[:_eid])    
  end

  def namespace(key)
    (@namespaces[key] || @namespaces.first.last).clone
  end

  def parse_json(data)
    Yajl::Parser.new(:symbolize_keys => true).parse(data)
  end

end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
fnordmetric-0.5.5 lib/fnordmetric/worker.rb
fnordmetric-0.5.4 lib/fnordmetric/worker.rb
fnordmetric-0.5.3 lib/fnordmetric/worker.rb
fnordmetric-0.5.2 lib/fnordmetric/worker.rb
fnordmetric-0.5.1 lib/fnordmetric/worker.rb
fnordmetric-0.5.0 lib/fnordmetric/worker.rb