require_relative 'util' module Upperkut class Strategy include Upperkut::Util attr_accessor :worker, :redis def initialize(worker, redis = Redis.new) self.worker = worker self.redis = redis end def push_items(items = []) items = [items] if items.is_a?(Hash) return false if items.empty? redis.rpush(key, encode_json_items(items)) end def fetch_items(batch_size = 1000) stop = [batch_size, size].min items = redis.multi do stop.times do redis.lpop(key) end end decode_json_items(items) end def size redis.llen(key) end def latency item = redis.lrange(key, 0, 0) item = decode_json_items(item).first return 0 unless item now = Time.now.to_f now - item.fetch('enqueued_at', Time.now).to_f end def clear redis.del(key) end private def key "upperkut:buffers:#{to_underscore(worker.name)}" end end end