Sha256: 4c71a5dd612671d5f9ea03fc691a36bcadd3c5bbd438f5376d0480aafff7c94f

Contents?: true

Size: 1.19 KB

Versions: 6

Compression:

Stored size: 1.19 KB

Contents

module Sidekiq::LimitFetch::Global
  module Monitor
    extend self

    HEARTBEAT_NAMESPACE = 'heartbeat:'
    PROCESSOR_NAMESPACE = 'processor:'

    HEARTBEAT_TTL = 400
    REFRESH_TIMEOUT = 180

    def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
      Thread.new do
        loop do
          update_heartbeat ttl
          invalidate_old_processors
          sleep timeout
        end
      end
    end

    private

    def update_heartbeat(ttl)
      Sidekiq.redis do |it|
        it.set processor_key, true
        it.set heartbeat_key, true
        it.expire heartbeat_key, ttl
      end
    end

    def invalidate_old_processors
      Sidekiq.redis do |it|
        it.keys(PROCESSOR_NAMESPACE + '*').each do |processor|
          processor.sub! PROCESSOR_NAMESPACE, ''
          next if it.get heartbeat_key processor

          it.del processor_key processor
          it.keys('limit_fetch:busy:*').each do |queue|
            it.lrem queue, 0, processor
          end
        end
      end
    end

    def heartbeat_key(processor=Selector.uuid)
      HEARTBEAT_NAMESPACE + processor
    end

    def processor_key(processor=Selector.uuid)
      PROCESSOR_NAMESPACE + processor
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
sidekiq-limit_fetch-1.2 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-1.1 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-1.0 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-0.9 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-0.8 lib/sidekiq/limit_fetch/global/monitor.rb
sidekiq-limit_fetch-0.7 lib/sidekiq/limit_fetch/global/monitor.rb