Sha256: 8ed4ece83bb10cc317ddac9a33b46d85109ceabded63516753696d775ac8859d

Contents?: true

Size: 928 Bytes

Versions: 1

Compression:

Stored size: 928 Bytes

Contents

require_relative 'util'

module Upperkut
  class Strategy
    include Upperkut::Util

    attr_accessor :worker, :redis

    def initialize(worker, redis)
      self.worker = worker
      self.redis  = redis
    end

    def push_items(items = [])
      return false if items.empty?
      redis.lpush(key, encode_json_items(items))
    end

    def fetch_items(batch_size = 1000)
      stop = [batch_size, size].min

      items = redis.multi do
        stop.times do
          redis.lpop(key)
        end
      end

      decode_json_items(items)
    end

    def size
      redis.llen(key)
    end

    def latency
      item = redis.lrange(key, -1, -1)
      item = decode_json_items(item).first
      return 0 unless item
      now = Time.now.to_f
      lat = now - item.fetch('enqueued_at', Time.now).to_f
      lat
    end

    private

    def key
      "upperkut:buffers:#{to_underscore(worker.name)}"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
upperkut-0.1.2 lib/upperkut/strategy.rb