lib/upperkut/strategy.rb in upperkut-0.5.2 vs lib/upperkut/strategy.rb in upperkut-0.6.0
- old
+ new
@@ -1,54 +1,73 @@
require_relative 'util'
+require_relative 'redis_pool'
module Upperkut
class Strategy
include Upperkut::Util
- attr_accessor :worker, :redis
+ attr_reader :options
- def initialize(worker, redis = Redis.new)
- self.worker = worker
- self.redis = redis
+ def initialize(worker, options = {})
+ @options = options
+ @redis_options = options.fetch(:redis, {})
+ @redis_pool = setup_redis_pool
+ @worker = worker
end
def push_items(items = [])
items = [items] if items.is_a?(Hash)
return false if items.empty?
- redis.rpush(key, encode_json_items(items))
+ redis do |conn|
+ conn.rpush(key, encode_json_items(items))
+ end
end
def fetch_items(batch_size = 1000)
stop = [batch_size, size].min
- items = redis.multi do
- stop.times do
- redis.lpop(key)
+ items = redis do |conn|
+ conn.multi do
+ stop.times { conn.lpop(key) }
end
end
decode_json_items(items)
end
def size
- redis.llen(key)
+ redis do |conn|
+ conn.llen(key)
+ end
end
def latency
- item = redis.lrange(key, 0, 0)
+ item = redis { |conn| conn.lrange(key, 0, 0) }
item = decode_json_items(item).first
return 0 unless item
now = Time.now.to_f
now - item.fetch('enqueued_at', Time.now).to_f
end
def clear
- redis.del(key)
+ redis { |conn| conn.del(key) }
end
private
+ def setup_redis_pool
+ return @redis_options if @redis_options.is_a?(ConnectionPool)
+ RedisPool.new(options.fetch(:redis, {})).create
+ end
+
+ def redis
+ raise ArgumentError, "requires a block" unless block_given?
+ @redis_pool.with do |conn|
+ yield conn
+ end
+ end
+
def key
- "upperkut:buffers:#{to_underscore(worker.name)}"
+ "upperkut:buffers:#{to_underscore(@worker.name)}"
end
end
end