Sha256: 1ee00e1dd8c9ee1102fb0433b7458027c53f77b72639106b34bdda0453d53664
Contents?: true
Size: 997 Bytes
Versions: 9
Compression:
Stored size: 997 Bytes
Contents
require_relative 'util' module Upperkut class Strategy include Upperkut::Util attr_accessor :worker, :redis def initialize(worker, redis) self.worker = worker self.redis = redis end def push_items(items = []) items = [items] if items.is_a?(Hash) return false if items.empty? redis.rpush(key, encode_json_items(items)) end def fetch_items(batch_size = 1000) stop = [batch_size, size].min items = redis.multi do stop.times do redis.lpop(key) end end decode_json_items(items) end def size redis.llen(key) end def latency item = redis.lrange(key, 0, 0) item = decode_json_items(item).first return 0 unless item now = Time.now.to_f now - item.fetch('enqueued_at', Time.now).to_f end def clear redis.del(key) end private def key "upperkut:buffers:#{to_underscore(worker.name)}" end end end
Version data entries
9 entries across 9 versions & 1 rubygems