Sha256: 9b776556269dd49f90ee806116d0fd3f86461c90513c11c69d161b17176339e6
Contents?: true
Size: 1.71 KB
Versions: 8
Compression:
Stored size: 1.71 KB
Contents
require 'singleton' require 'thread' module PulseMeter module CommandAggregator class Async include Singleton MAX_QUEUE_LENGTH = 10_000 attr_reader :max_queue_length def initialize @max_queue_length = MAX_QUEUE_LENGTH @queue = Queue.new @buffer = [] @in_multi = false @consumer_thread = run_consumer at_exit{ wait_for_pending_events } end def multi @in_multi = true yield ensure @in_multi = false send_buffer_to_queue end def method_missing(*args) @buffer << args send_buffer_to_queue unless @in_multi end def wait_for_pending_events(max_seconds = 1) left_to_wait = max_seconds.to_f sleep_step = 0.01 while has_pending_events? && left_to_wait > 0 left_to_wait -= sleep_step sleep(sleep_step) end end private def has_pending_events? !@queue.empty? end def send_buffer_to_queue if @queue.size < @max_queue_length @queue << @buffer end @buffer = [] end def redis PulseMeter.redis end def consume_commands # redis and @queue are threadsafe while commands = @queue.pop begin redis.multi do commands.each do |command| redis.send(*command) end end rescue StandardError => e PulseMeter.error "error in consumer thread: #{e}, #{e.backtrace.join("\n")}" end end end def run_consumer Thread.new do consume_commands end end end end end
Version data entries
8 entries across 8 versions & 2 rubygems