Sha256: a1d6a28c0ac3405cef1b51379cee845d54d957951a3da3c3c0facbb243f89050

Contents?: true

Size: 1.66 KB

Versions: 9

Compression:

Stored size: 1.66 KB

Contents

require 'singleton'
require 'thread'

module PulseMeter
  module CommandAggregator
    class Async
      include Singleton

      MAX_QUEUE_LENGTH = 10_000

      attr_reader :max_queue_length

      def initialize
        @max_queue_length = MAX_QUEUE_LENGTH
        @queue = Queue.new
        @buffer = []
        @in_multi = false
        @consumer_thread = run_consumer
      end

      def multi
        @in_multi = true
        yield
      ensure
        @in_multi = false
        send_buffer_to_queue
      end

      def method_missing(*args)
        @buffer << args
        send_buffer_to_queue unless @in_multi
      end

      def wait_for_pending_events(max_seconds = 1)
        left_to_wait = max_seconds.to_f
        sleep_step = 0.01
        while has_pending_events? && left_to_wait > 0
          left_to_wait -= sleep_step
          sleep(sleep_step)
        end
      end

      private

      def has_pending_events?
        !@queue.empty?
      end

      def send_buffer_to_queue
        if @queue.size < @max_queue_length
          @queue << @buffer
        end
        @buffer = []
      end

      def redis
        PulseMeter.redis
      end

      def consume_commands
        # redis and @queue are threadsafe
        while commands = @queue.pop
          begin
            redis.multi do 
              commands.each do |command|
                redis.send(*command)
              end
            end
          rescue StandardError => e
            PulseMeter.error "error in consumer thread: #{e}, #{e.backtrace.join("\n")}"
          end
        end
      end

      def run_consumer
        Thread.new do
          consume_commands
        end
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
pulse-meter-0.4.8 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.7 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.6 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.5 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.4 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.3 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.2 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.1 lib/pulse-meter/command_aggregator/async.rb
pulse-meter-0.4.0 lib/pulse-meter/command_aggregator/async.rb