Sha256: a1d6a28c0ac3405cef1b51379cee845d54d957951a3da3c3c0facbb243f89050
Contents?: true
Size: 1.66 KB
Versions: 9
Compression:
Stored size: 1.66 KB
Contents
require 'singleton' require 'thread' module PulseMeter module CommandAggregator class Async include Singleton MAX_QUEUE_LENGTH = 10_000 attr_reader :max_queue_length def initialize @max_queue_length = MAX_QUEUE_LENGTH @queue = Queue.new @buffer = [] @in_multi = false @consumer_thread = run_consumer end def multi @in_multi = true yield ensure @in_multi = false send_buffer_to_queue end def method_missing(*args) @buffer << args send_buffer_to_queue unless @in_multi end def wait_for_pending_events(max_seconds = 1) left_to_wait = max_seconds.to_f sleep_step = 0.01 while has_pending_events? && left_to_wait > 0 left_to_wait -= sleep_step sleep(sleep_step) end end private def has_pending_events? !@queue.empty? end def send_buffer_to_queue if @queue.size < @max_queue_length @queue << @buffer end @buffer = [] end def redis PulseMeter.redis end def consume_commands # redis and @queue are threadsafe while commands = @queue.pop begin redis.multi do commands.each do |command| redis.send(*command) end end rescue StandardError => e PulseMeter.error "error in consumer thread: #{e}, #{e.backtrace.join("\n")}" end end end def run_consumer Thread.new do consume_commands end end end end end
Version data entries
9 entries across 9 versions & 1 rubygems