Sha256: bb6b5f4304d726fe8e1a62eea34afb9d4ee7871e4a94281476a38dfb9f17c46b

Contents?: true

Size: 1.45 KB

Versions: 3

Compression:

Stored size: 1.45 KB

Contents

require "logstash/devutils/rspec/spec_helper"
require "redis"

def populate(key, event_count)
  require "logstash/event"
  redis = Redis.new(:host => "localhost")
  event_count.times do |value|
    event = LogStash::Event.new("sequence" => value)
    Stud::try(10.times) do
      redis.rpush(key, event.to_json)
    end
  end
end

def process(pipeline, queue, event_count)
  sequence = 0
  Thread.new { pipeline.run }
  event_count.times do |i|
    event = queue.pop
    insist { event["sequence"] } == i
  end
  pipeline.shutdown
end # process

describe "inputs/redis", :redis => true do
  

  describe "read events from a list" do
    key = 10.times.collect { rand(10).to_s }.join("")
    event_count = 1000 + rand(50)
    config <<-CONFIG
      input {
        redis {
          type => "blah"
          key => "#{key}"
          data_type => "list"
        }
      }
    CONFIG

    before(:each) { populate(key, event_count) }

    input { |pipeline, queue| process(pipeline, queue, event_count) }
  end

  describe "read events from a list with batch_count=5" do
    key = 10.times.collect { rand(10).to_s }.join("")
    event_count = 1000 + rand(50)
    config <<-CONFIG
      input {
        redis {
          type => "blah"
          key => "#{key}"
          data_type => "list"
          batch_count => #{rand(20)+1}
        }
      }
    CONFIG

    before(:each) { populate(key, event_count) }
    input { |pipeline, queue| process(pipeline, queue, event_count) }
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-input-redis-0.1.3 spec/inputs/redis_spec.rb
logstash-input-redis-0.1.2 spec/inputs/redis_spec.rb
logstash-input-redis-0.1.1 spec/inputs/redis_spec.rb