Sha256: 51cdb19ceb600dd10d89ac9d0beef082789fbf4fe7a8b6a45dbd0cb6a32aae58

Contents?: true

Size: 1.31 KB

Versions: 6

Compression:

Stored size: 1.31 KB

Contents

module Sidekiq::LimitFetch::Global
  module Monitor
    extend self

    HEARTBEAT_NAMESPACE = 'heartbeat:'
    PROCESSOR_NAMESPACE = 'processor:'

    HEARTBEAT_TTL   = 18
    REFRESH_TIMEOUT = 10

    def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
      Thread.new do
        loop do
          update_heartbeat ttl
          invalidate_old_processors
          sleep timeout
        end
      end
    end

    private

    def update_heartbeat(ttl)
      Sidekiq.redis do |it|
        it.pipelined do
          it.set processor_key, true
          it.set heartbeat_key, true
          it.expire heartbeat_key, ttl
        end
      end
    end

    def invalidate_old_processors
      Sidekiq.redis do |it|
        it.keys(PROCESSOR_NAMESPACE + '*').each do |processor|
          processor.sub! PROCESSOR_NAMESPACE, ''
          next if it.get heartbeat_key processor

          it.del processor_key processor
          %w(limit_fetch:probed:* limit_fetch:busy:*).each do |pattern|
            it.keys(pattern).each do |queue|
              it.lrem queue, 0, processor
            end
          end
        end
      end
    end

    def heartbeat_key(processor=Selector.uuid)
      HEARTBEAT_NAMESPACE + processor
    end

    def processor_key(processor=Selector.uuid)
      PROCESSOR_NAMESPACE + processor
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
sidekiq-limit_fetch-2.1.1 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-2.1.0 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-2.0.2 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-2.0.1 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-2.0 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-1.7 lib/sidekiq/limit_fetch/global/monitor.rb