Sha256: aa797658241a6435304ca74810395fb3a37cc2a6cf1d14bec3891c35f2ad24fc

Contents?: true

Size: 1.52 KB

Versions: 2

Compression:

Stored size: 1.52 KB

Contents

require 'pause'
require 'spanx/logger'
require 'spanx/helper/timing'

module Spanx
  module Actor
    class Collector
      include Spanx::Helper::Timing

      attr_accessor :queue, :config, :semaphore, :cache

      def initialize(config, queue)
        @queue = queue
        @config = config
        @semaphore = Mutex.new
        @cache = Hash.new(0)
      end

      def run
        Thread.new do
          Thread.current[:name] = "collector:queue"
          loop do
            unless queue.empty?
              Logger.logging "caching [#{queue.size}] keys locally" do
                while !queue.empty?
                  semaphore.synchronize {
                    increment_ip *(queue.pop)
                  }
                end
              end
            end
            sleep 1
          end
        end

        Thread.new do
          Thread.current[:name] = "collector:flush"
          loop do
            semaphore.synchronize {
              Logger.logging "flushing cache with [#{cache.keys.size}] keys" do
                cache.each_pair do |key, count|
                  Spanx::IPChecker.new(key[0]).increment!(count, key[1])
                end
                reset_cache
              end
            }
            sleep config[:collector][:flush_interval]
          end
        end
      end

      def increment_ip(ip, timestamp)
        cache[[ip, period_marker(config[:collector][:resolution], timestamp)]] += 1
      end

      private

      def reset_cache
        @cache.clear
        GC.start
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
spanx-0.3.0 lib/spanx/actor/collector.rb
spanx-0.1.1 lib/spanx/actor/collector.rb