Sha256: 85519c29acf85137e7ae76c4568eb9124c9341a1415f5150717fe1ef3cbe730d

Contents?: true

Size: 2 KB

Versions: 3

Compression:

Stored size: 2 KB

Contents

class FnordMetric::Worker

  def initialize
    @namespaces = FnordMetric.namespaces
    @opts = FnordMetric.options

    FnordMetric.register(self)
  end

  def initialized
    FnordMetric.log("worker started")
    EM.next_tick(&method(:tick))
  end

  def tick
    redis.blpop(queue_key, 1).callback do |list, event_id|
      EM.next_tick(&method(:tick))
      if event_id
        redis.get(event_key(event_id)).callback do |event_data|
          process_event(event_id, event_data) if event_data
          FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data
          redis.hincrby(stats_key, :events_processed, 1)
        end
      end
    end
  end

  def process_event(event_id, event_data)
    EM.next_tick do
      event = parse_json(event_data)
      if event
        event[:_time] ||= Time.now.to_i
        event[:_eid] = event_id
        announce_event(event)
        publish_event(event)
        expire_event(event_id)
      end
    end
  end

  def pubsub_key
    [@opts[:redis_prefix], 'announce'].join("-")
  end

  def queue_key
    [@opts[:redis_prefix], 'queue'].join("-")
  end

  def event_key(event_id)
    [@opts[:redis_prefix], 'event', event_id].join("-")
  end

  def stats_key
    [@opts[:redis_prefix], 'stats'].join("-")
  end

  def announce_event(event)
    namespace(event[:_namespace]).ready!(redis).announce(event)
  end

  def expire_event(event_id)
    redis.expire(event_key(event_id), @opts[:event_data_ttl])
  end

  def publish_event(event)    
    redis.publish(pubsub_key, event.to_json)
  end

  def namespace(key)
    (@namespaces[key] || @namespaces.first.last).clone
  end

  def parse_json(data)
    event = Yajl::Parser.new(:symbolize_keys => true).parse(data)
    event[:_namespace] = event[:_namespace].to_sym if event[:_namespace]
    event
  rescue Yajl::ParseError => e
    FnordMetric.error "invalid json: #{e.to_s}"; false
  end

  def redis
    @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL
  end

end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fnordmetric-1.0.1 lib/fnordmetric/worker.rb
fnordmetric-1.0.0 lib/fnordmetric/worker.rb
fnordmetric-0.9.7 lib/fnordmetric/worker.rb