Sha256: ff226f39af83fee7d45748253b30609640b5d512bd5429ca9d5d1a079509693c

Contents?: true

Size: 1.24 KB

Versions: 2

Compression:

Stored size: 1.24 KB

Contents

# frozen_string_literal: true

require 'rails_autoscale_agent/logger'
require 'time'

module RailsAutoscaleAgent
  module WorkerAdapters
    class Que
      include RailsAutoscaleAgent::Logger
      include Singleton

      DEFAULT_QUEUES = ['default']

      class << self
        attr_accessor :queues
      end

      def initialize
        self.class.queues = DEFAULT_QUEUES
      end

      def enabled?
        defined? ::Que
      end

      def collect!(store)
        log_msg = String.new
        t = Time.now

        # Ignore failed jobs (they skew latency measurement due to the original run_at)
        sql = 'SELECT queue, min(run_at) FROM que_jobs WHERE error_count = 0 GROUP BY queue'
        run_at_by_queue = Hash[ActiveRecord::Base.connection.select_rows(sql)]
        self.class.queues |= run_at_by_queue.keys

        self.class.queues.each do |queue|
          next if queue.nil? || queue.empty?
          run_at = run_at_by_queue[queue]
          run_at = Time.parse(run_at) if run_at.is_a?(String)
          latency_ms = run_at ? ((t - run_at)*1000).ceil : 0
          store.push latency_ms, t, queue
          log_msg << "que.#{queue}=#{latency_ms} "
        end

        logger.debug log_msg unless log_msg.empty?
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rails_autoscale_agent-0.9.0.beta.2 lib/rails_autoscale_agent/worker_adapters/que.rb
rails_autoscale_agent-0.9.0.beta.1 lib/rails_autoscale_agent/worker_adapters/que.rb