spec/queue_spec.rb in agent-0.1.0 vs spec/queue_spec.rb in agent-0.9.0
- old
+ new
@@ -1,53 +1,351 @@
-require 'helper'
+require "spec_helper"
-describe Agent::Transport::Queue do
- include Agent::Transport
+describe Agent::Queue do
- it "should support synchronous, unbuffered communication" do
- lambda { Queue.new("spec") }.should_not raise_error
+ context "with an buffered queue" do
+ before do
+ @queue = Agent::Queue::Buffered.new(String, 2)
+ end
- q = Queue.new("spec")
- q.max.should == 1
- q.async?.should be_false
+ it "should be buffered" do
+ @queue.should be_buffered
+ end
- lambda { q.send("hello") }.should_not raise_error
- lambda { q.send("hello", true) }.should raise_error(ThreadError, "buffer full")
+ it "should not be unbuffered" do
+ @queue.should_not be_unbuffered
+ end
- q.receive.should == "hello"
- lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty")
- end
+ it "should raise an error if the queue size is <= 0" do
+ lambda{ Agent::Queue::Buffered.new(String, 0) }.should raise_error(Agent::Errors::InvalidQueueSize)
+ lambda{ Agent::Queue::Buffered.new(String, -1) }.should raise_error(Agent::Errors::InvalidQueueSize)
+ end
- it "should support asynchronous, buffered communication" do
- lambda { Queue.new("spec", 2) }.should_not raise_error
+ it "should raise an erro when an object of an invalid type is pushed" do
+ lambda { @queue.push(1) }.should raise_error(Agent::Errors::InvalidType)
+ end
- q = Queue.new("spec", 2)
- q.max.should == 2
- q.async?.should be_true
+ it "should enqueue and dequeue in order" do
+ 20.times{|i| @queue.push(i.to_s, :deferred => true) }
- lambda { q.send("hello 1") }.should_not raise_error
- lambda { q.send("hello 2", true) }.should_not raise_error(ThreadError, "buffer full")
- lambda { q.send("hello 3", true) }.should raise_error(ThreadError, "buffer full")
+ previous = -1
- q.receive.should == "hello 1"
- q.receive.should == "hello 2"
- lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty")
- end
+ 20.times do |i|
+ o = @queue.pop[0].to_i
+ o.should > previous
+ previous = o
+ end
+ end
- it "should persist data between queue objects" do
- q = Queue.new("spec")
- q.send "hello"
+ context "when the queue is empty" do
+ it "should hold any attempts to pop from it" do
+ @queue.operations.should be_empty
+ @queue.pop(:deferred => true)
+ @queue.operations.should_not be_empty
+ end
- q = Queue.new("spec")
- q.receive.should == "hello"
+ it "should be able to be pushed to" do
+ @queue.push("1")
+ end
+
+ it "should increase in size when pushed to" do
+ @queue.size.should == 0
+ @queue.push("1")
+ @queue.size.should == 1
+ end
+
+ it "should be pushable" do
+ @queue.push?.should == true
+ end
+
+ it "should not be poppable" do
+ @queue.pop?.should == false
+ end
+ end
+
+ context "when there are elements in the queue and still space left" do
+ before do
+ @queue.push("1")
+ end
+
+ it "should be able to be pushed to" do
+ @queue.push("1")
+ end
+
+ it "should increase in size when pushed to" do
+ @queue.size.should == 1
+ @queue.push("1")
+ @queue.size.should == 2
+ end
+
+ it "should be able to be popped from" do
+ @queue.pop[0].should == "1"
+ end
+
+ it "should decrease in size when popped from" do
+ @queue.size.should == 1
+ @queue.pop
+ @queue.size.should == 0
+ end
+
+ it "should be pushable" do
+ @queue.push?.should == true
+ end
+
+ it "should be poppable" do
+ @queue.pop?.should == true
+ end
+ end
+
+ context "when the queue is full" do
+ before do
+ 2.times { @queue.push("1") }
+ end
+
+ it "should hold any attempts to push to it" do
+ @queue.operations.should be_empty
+ @queue.push("1", :deferred => true)
+ @queue.operations.should_not be_empty
+ end
+
+ it "should be able to be popped from" do
+ @queue.pop[0].should == "1"
+ end
+
+ it "should not be pushable" do
+ @queue.push?.should == false
+ end
+
+ it "should be poppable" do
+ @queue.pop?.should == true
+ end
+ end
+
+ context "when being closed" do
+ before do
+ @push1, @push2, @push3 = (1..3).map{ @queue.push("1", :deferred => true) }
+ end
+
+ it "should go from open to closed" do
+ @queue.should_not be_closed
+ @queue.should be_open
+ @queue.close
+ @queue.should be_closed
+ @queue.should_not be_open
+ end
+
+ it "should close all the waiting operations" do
+ @push1.should be_sent
+ @push2.should be_sent
+ @push3.should_not be_sent
+ @push3.should_not be_closed
+
+ @queue.close
+
+ @push3.should be_closed
+ end
+
+ it "should clear all waiting operations" do
+ @queue.operations.size.should == 1
+ @queue.pushes.size.should == 1
+ @queue.close
+ @queue.operations.size.should == 0
+ @queue.pushes.size.should == 0
+ end
+
+ it "should clear all elements at rest" do
+ @queue.queue.size.should == 2
+ @queue.close
+ @queue.queue.size.should == 0
+ end
+
+ it "should raise an error when being acted upon afterwards" do
+ @queue.close
+ lambda{ @queue.close }.should raise_error(Agent::Errors::ChannelClosed)
+ lambda{ @queue.push("1") }.should raise_error(Agent::Errors::ChannelClosed)
+ lambda{ @queue.pop }.should raise_error(Agent::Errors::ChannelClosed)
+ end
+ end
+
+ context "when removing operations" do
+ before do
+ @pushes = (1..8).map{|i| @queue.push(i.to_s, :deferred => true) }
+ end
+
+ it "should remove the operations" do
+ removable_pushes = @pushes.values_at(5, 6) # values "6" and "7"
+ @queue.remove_operations(removable_pushes)
+ while @queue.pop?
+ i = @queue.pop[0]
+ i.should_not be_nil
+ i.should_not == "6"
+ i.should_not == "7"
+ end
+ end
+ end
end
- it "should clear registry on close" do
- q = Queue.new("spec")
- q.send "hello"
- q.close
+ context "with a unbuffered queue" do
+ before do
+ @queue = Agent::Queue::Unbuffered.new(String)
+ end
- q = Queue.new("spec")
- lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty")
- end
+ it "should not be buffered" do
+ @queue.should_not be_buffered
+ end
+
+ it "should be unbuffered" do
+ @queue.should be_unbuffered
+ end
+
+ it "should enqueue and dequeue in order" do
+ 20.times{|i| @queue.push(i.to_s, :deferred => true) }
+
+ previous = -1
+
+ 20.times do |i|
+ o = @queue.pop[0].to_i
+ o.should > previous
+ previous = o
+ end
+ end
+
+ context "when there are no operations waiting" do
+ it "should not be poppable" do
+ @queue.pop?.should == false
+ end
+
+ it "should not be pushable" do
+ @queue.push?.should == false
+ end
+
+ it "should queue pushes" do
+ @queue.operations.size.should == 0
+ push = @queue.push("1", :deferred => true)
+ push.should_not be_sent
+ @queue.operations.size.should == 1
+ end
+
+ it "should queue pops" do
+ @queue.operations.size.should == 0
+ pop = @queue.pop(:deferred => true)
+ pop.should_not be_received
+ @queue.operations.size.should == 1
+ end
+ end
+
+ context "when there is a pop waiting" do
+ before do
+ @pop = @queue.pop(:deferred => true)
+ end
+
+ it "should not be poppable" do
+ @queue.pop?.should == false
+ end
+
+ it "should be pushable" do
+ @queue.push?.should == true
+ end
+
+ it "should execute a push and the waiting pop immediately" do
+ push = @queue.push("1", :deferred => true)
+ @pop.should be_received
+ push.should be_sent
+ @pop.object.should == "1"
+ end
+
+ it "should queue pops" do
+ @queue.operations.size.should == 1
+ pop = @queue.pop(:deferred => true)
+ pop.should_not be_received
+ @queue.operations.size.should == 2
+ end
+ end
+
+ context "when there is a push waiting" do
+ before do
+ @push = @queue.push("1", :deferred => true)
+ end
+
+ it "should be poppable" do
+ @queue.pop?.should == true
+ end
+
+ it "should not be pushable" do
+ @queue.push?.should == false
+ end
+
+ it "should queue pushes" do
+ @queue.operations.size.should == 1
+ push = @queue.push("1", :deferred => true)
+ push.should_not be_sent
+ @queue.operations.size.should == 2
+ end
+
+ it "should execute a pop and the waiting push immediately" do
+ pop = @queue.pop(:deferred => true)
+ @push.should be_sent
+ pop.should be_received
+ pop.object.should == "1"
+ end
+ end
+
+ context "when being closed" do
+ before do
+ @push1, @push2 = (1..2).map{ @queue.push("1", :deferred => true) }
+ end
+
+ it "should go from open to closed" do
+ @queue.should_not be_closed
+ @queue.should be_open
+ @queue.close
+ @queue.should be_closed
+ @queue.should_not be_open
+ end
+
+ it "should close all the waiting operations" do
+ @push1.should_not be_sent
+ @push1.should_not be_closed
+ @push2.should_not be_sent
+ @push2.should_not be_closed
+
+ @queue.close
+
+ @push1.should be_closed
+ @push2.should be_closed
+ end
+
+ it "should clear all waiting operations" do
+ @queue.operations.size.should == 2
+ @queue.pushes.size.should == 2
+ @queue.close
+ @queue.operations.size.should == 0
+ @queue.pushes.size.should == 0
+ end
+
+ it "should raise an error when being acted upon afterwards" do
+ @queue.close
+ lambda{ @queue.close }.should raise_error(Agent::Errors::ChannelClosed)
+ lambda{ @queue.push("1") }.should raise_error(Agent::Errors::ChannelClosed)
+ lambda{ @queue.pop }.should raise_error(Agent::Errors::ChannelClosed)
+ end
+ end
+
+ context "when removing operations" do
+ before do
+ @pushes = (1..8).map{|i| @queue.push(i.to_s, :deferred => true) }
+ end
+
+ it "should remove the operations" do
+ removable_pushes = @pushes.values_at(5, 6) # values "6" and "7"
+ @queue.remove_operations(removable_pushes)
+ while @queue.pop?
+ i = @queue.pop[0]
+ i.should_not be_nil
+ i.should_not == "6"
+ i.should_not == "7"
+ end
+ end
+ end
+ end
end