Sha256: 1ee00e1dd8c9ee1102fb0433b7458027c53f77b72639106b34bdda0453d53664

Contents?: true

Size: 997 Bytes

Versions: 9

Compression:

Stored size: 997 Bytes

Contents

require_relative 'util'

module Upperkut
  class Strategy
    include Upperkut::Util

    attr_accessor :worker, :redis

    def initialize(worker, redis)
      self.worker = worker
      self.redis  = redis
    end

    def push_items(items = [])
      items = [items] if items.is_a?(Hash)
      return false if items.empty?
      redis.rpush(key, encode_json_items(items))
    end

    def fetch_items(batch_size = 1000)
      stop = [batch_size, size].min

      items = redis.multi do
        stop.times do
          redis.lpop(key)
        end
      end

      decode_json_items(items)
    end

    def size
      redis.llen(key)
    end

    def latency
      item = redis.lrange(key, 0, 0)
      item = decode_json_items(item).first
      return 0 unless item
      now = Time.now.to_f
      now - item.fetch('enqueued_at', Time.now).to_f
    end

    def clear
      redis.del(key)
    end

    private

    def key
      "upperkut:buffers:#{to_underscore(worker.name)}"
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
upperkut-0.4.6 lib/upperkut/strategy.rb
upperkut-0.4.5 lib/upperkut/strategy.rb
upperkut-0.4.4 lib/upperkut/strategy.rb
upperkut-0.4.3 lib/upperkut/strategy.rb
upperkut-0.4.2 lib/upperkut/strategy.rb
upperkut-0.4.1 lib/upperkut/strategy.rb
upperkut-0.4.0 lib/upperkut/strategy.rb
upperkut-0.3.0 lib/upperkut/strategy.rb
upperkut-0.1.4 lib/upperkut/strategy.rb