Sha256: 56d025c01d4e555a8172b4a8317ce0768485089fe6eeebba94d17d0d92a15530

Contents?: true

Size: 1009 Bytes

Versions: 3

Compression:

Stored size: 1009 Bytes

Contents

require_relative 'util'

module Upperkut
  class Strategy
    include Upperkut::Util

    attr_accessor :worker, :redis

    def initialize(worker, redis = Redis.new)
      self.worker = worker
      self.redis  = redis
    end

    def push_items(items = [])
      items = [items] if items.is_a?(Hash)
      return false if items.empty?
      redis.rpush(key, encode_json_items(items))
    end

    def fetch_items(batch_size = 1000)
      stop = [batch_size, size].min

      items = redis.multi do
        stop.times do
          redis.lpop(key)
        end
      end

      decode_json_items(items)
    end

    def size
      redis.llen(key)
    end

    def latency
      item = redis.lrange(key, 0, 0)
      item = decode_json_items(item).first
      return 0 unless item
      now = Time.now.to_f
      now - item.fetch('enqueued_at', Time.now).to_f
    end

    def clear
      redis.del(key)
    end

    private

    def key
      "upperkut:buffers:#{to_underscore(worker.name)}"
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
upperkut-0.5.2 lib/upperkut/strategy.rb
upperkut-0.5.1 lib/upperkut/strategy.rb
upperkut-0.5.0 lib/upperkut/strategy.rb