Sha256: 56d025c01d4e555a8172b4a8317ce0768485089fe6eeebba94d17d0d92a15530
Contents?: true
Size: 1009 Bytes
Versions: 3
Compression:
Stored size: 1009 Bytes
Contents
require_relative 'util' module Upperkut class Strategy include Upperkut::Util attr_accessor :worker, :redis def initialize(worker, redis = Redis.new) self.worker = worker self.redis = redis end def push_items(items = []) items = [items] if items.is_a?(Hash) return false if items.empty? redis.rpush(key, encode_json_items(items)) end def fetch_items(batch_size = 1000) stop = [batch_size, size].min items = redis.multi do stop.times do redis.lpop(key) end end decode_json_items(items) end def size redis.llen(key) end def latency item = redis.lrange(key, 0, 0) item = decode_json_items(item).first return 0 unless item now = Time.now.to_f now - item.fetch('enqueued_at', Time.now).to_f end def clear redis.del(key) end private def key "upperkut:buffers:#{to_underscore(worker.name)}" end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
upperkut-0.5.2 | lib/upperkut/strategy.rb |
upperkut-0.5.1 | lib/upperkut/strategy.rb |
upperkut-0.5.0 | lib/upperkut/strategy.rb |