Sha256: a648a8eb6905035ed6f0907a98b3aa083bcf8106ecbfbda2e433186938f79faf

Contents?: true

Size: 1.04 KB

Versions: 1

Compression:

Stored size: 1.04 KB

Contents

require "nest"

module Ost
  VERSION = "0.0.2"
  TIMEOUT = ENV["OST_TIMEOUT"] || 2

  class Queue
    attr :ns

    def initialize(name)
      @ns = Nest.new(:ost)[name]
    end

    def push(value)
      redis.lpush(ns, value)
    end

    def each(&block)
      loop do
        _, item = redis.brpop(ns, TIMEOUT)
        next if item.nil? or item.empty?

        begin
          block.call(item)
        rescue Exception => e
          error = "#{Time.now} #{ns[item]} => #{e.inspect}"

          redis.rpush   ns[:errors], error
          redis.publish ns[:errors], error
        end
      end
    end

    def errors
      redis.lrange ns[:errors], 0, -1
    end

    alias << push
    alias pop each

  private

    def redis
      Ost.redis
    end
  end

  @queues = Hash.new do |hash, key|
    hash[key] = Queue.new(key)
  end

  def self.[](queue)
    @queues[queue]
  end

  def self.connect(options = {})
    @redis = Redis.connect(options)
  end

  def self.redis
    @redis ||= Redis.connect
  end

  def self.redis=(redis)
    @redis = redis
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ost-0.0.2 lib/ost.rb