Sha256: a648a8eb6905035ed6f0907a98b3aa083bcf8106ecbfbda2e433186938f79faf
Contents?: true
Size: 1.04 KB
Versions: 1
Compression:
Stored size: 1.04 KB
Contents
require "nest" module Ost VERSION = "0.0.2" TIMEOUT = ENV["OST_TIMEOUT"] || 2 class Queue attr :ns def initialize(name) @ns = Nest.new(:ost)[name] end def push(value) redis.lpush(ns, value) end def each(&block) loop do _, item = redis.brpop(ns, TIMEOUT) next if item.nil? or item.empty? begin block.call(item) rescue Exception => e error = "#{Time.now} #{ns[item]} => #{e.inspect}" redis.rpush ns[:errors], error redis.publish ns[:errors], error end end end def errors redis.lrange ns[:errors], 0, -1 end alias << push alias pop each private def redis Ost.redis end end @queues = Hash.new do |hash, key| hash[key] = Queue.new(key) end def self.[](queue) @queues[queue] end def self.connect(options = {}) @redis = Redis.connect(options) end def self.redis @redis ||= Redis.connect end def self.redis=(redis) @redis = redis end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
ost-0.0.2 | lib/ost.rb |