spec/queue_spec.rb in agent-0.1.0 vs spec/queue_spec.rb in agent-0.9.0

- old
+ new

@@ -1,53 +1,351 @@ -require 'helper' +require "spec_helper" -describe Agent::Transport::Queue do - include Agent::Transport +describe Agent::Queue do - it "should support synchronous, unbuffered communication" do - lambda { Queue.new("spec") }.should_not raise_error + context "with an buffered queue" do + before do + @queue = Agent::Queue::Buffered.new(String, 2) + end - q = Queue.new("spec") - q.max.should == 1 - q.async?.should be_false + it "should be buffered" do + @queue.should be_buffered + end - lambda { q.send("hello") }.should_not raise_error - lambda { q.send("hello", true) }.should raise_error(ThreadError, "buffer full") + it "should not be unbuffered" do + @queue.should_not be_unbuffered + end - q.receive.should == "hello" - lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty") - end + it "should raise an error if the queue size is <= 0" do + lambda{ Agent::Queue::Buffered.new(String, 0) }.should raise_error(Agent::Errors::InvalidQueueSize) + lambda{ Agent::Queue::Buffered.new(String, -1) }.should raise_error(Agent::Errors::InvalidQueueSize) + end - it "should support asynchronous, buffered communication" do - lambda { Queue.new("spec", 2) }.should_not raise_error + it "should raise an erro when an object of an invalid type is pushed" do + lambda { @queue.push(1) }.should raise_error(Agent::Errors::InvalidType) + end - q = Queue.new("spec", 2) - q.max.should == 2 - q.async?.should be_true + it "should enqueue and dequeue in order" do + 20.times{|i| @queue.push(i.to_s, :deferred => true) } - lambda { q.send("hello 1") }.should_not raise_error - lambda { q.send("hello 2", true) }.should_not raise_error(ThreadError, "buffer full") - lambda { q.send("hello 3", true) }.should raise_error(ThreadError, "buffer full") + previous = -1 - q.receive.should == "hello 1" - q.receive.should == "hello 2" - lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty") - end + 20.times do |i| + o = @queue.pop[0].to_i + o.should > previous + previous = o + end + end - it "should persist data between queue objects" do - q = Queue.new("spec") - q.send "hello" + context "when the queue is empty" do + it "should hold any attempts to pop from it" do + @queue.operations.should be_empty + @queue.pop(:deferred => true) + @queue.operations.should_not be_empty + end - q = Queue.new("spec") - q.receive.should == "hello" + it "should be able to be pushed to" do + @queue.push("1") + end + + it "should increase in size when pushed to" do + @queue.size.should == 0 + @queue.push("1") + @queue.size.should == 1 + end + + it "should be pushable" do + @queue.push?.should == true + end + + it "should not be poppable" do + @queue.pop?.should == false + end + end + + context "when there are elements in the queue and still space left" do + before do + @queue.push("1") + end + + it "should be able to be pushed to" do + @queue.push("1") + end + + it "should increase in size when pushed to" do + @queue.size.should == 1 + @queue.push("1") + @queue.size.should == 2 + end + + it "should be able to be popped from" do + @queue.pop[0].should == "1" + end + + it "should decrease in size when popped from" do + @queue.size.should == 1 + @queue.pop + @queue.size.should == 0 + end + + it "should be pushable" do + @queue.push?.should == true + end + + it "should be poppable" do + @queue.pop?.should == true + end + end + + context "when the queue is full" do + before do + 2.times { @queue.push("1") } + end + + it "should hold any attempts to push to it" do + @queue.operations.should be_empty + @queue.push("1", :deferred => true) + @queue.operations.should_not be_empty + end + + it "should be able to be popped from" do + @queue.pop[0].should == "1" + end + + it "should not be pushable" do + @queue.push?.should == false + end + + it "should be poppable" do + @queue.pop?.should == true + end + end + + context "when being closed" do + before do + @push1, @push2, @push3 = (1..3).map{ @queue.push("1", :deferred => true) } + end + + it "should go from open to closed" do + @queue.should_not be_closed + @queue.should be_open + @queue.close + @queue.should be_closed + @queue.should_not be_open + end + + it "should close all the waiting operations" do + @push1.should be_sent + @push2.should be_sent + @push3.should_not be_sent + @push3.should_not be_closed + + @queue.close + + @push3.should be_closed + end + + it "should clear all waiting operations" do + @queue.operations.size.should == 1 + @queue.pushes.size.should == 1 + @queue.close + @queue.operations.size.should == 0 + @queue.pushes.size.should == 0 + end + + it "should clear all elements at rest" do + @queue.queue.size.should == 2 + @queue.close + @queue.queue.size.should == 0 + end + + it "should raise an error when being acted upon afterwards" do + @queue.close + lambda{ @queue.close }.should raise_error(Agent::Errors::ChannelClosed) + lambda{ @queue.push("1") }.should raise_error(Agent::Errors::ChannelClosed) + lambda{ @queue.pop }.should raise_error(Agent::Errors::ChannelClosed) + end + end + + context "when removing operations" do + before do + @pushes = (1..8).map{|i| @queue.push(i.to_s, :deferred => true) } + end + + it "should remove the operations" do + removable_pushes = @pushes.values_at(5, 6) # values "6" and "7" + @queue.remove_operations(removable_pushes) + while @queue.pop? + i = @queue.pop[0] + i.should_not be_nil + i.should_not == "6" + i.should_not == "7" + end + end + end end - it "should clear registry on close" do - q = Queue.new("spec") - q.send "hello" - q.close + context "with a unbuffered queue" do + before do + @queue = Agent::Queue::Unbuffered.new(String) + end - q = Queue.new("spec") - lambda { q.receive(true) }.should raise_error(ThreadError, "buffer empty") - end + it "should not be buffered" do + @queue.should_not be_buffered + end + + it "should be unbuffered" do + @queue.should be_unbuffered + end + + it "should enqueue and dequeue in order" do + 20.times{|i| @queue.push(i.to_s, :deferred => true) } + + previous = -1 + + 20.times do |i| + o = @queue.pop[0].to_i + o.should > previous + previous = o + end + end + + context "when there are no operations waiting" do + it "should not be poppable" do + @queue.pop?.should == false + end + + it "should not be pushable" do + @queue.push?.should == false + end + + it "should queue pushes" do + @queue.operations.size.should == 0 + push = @queue.push("1", :deferred => true) + push.should_not be_sent + @queue.operations.size.should == 1 + end + + it "should queue pops" do + @queue.operations.size.should == 0 + pop = @queue.pop(:deferred => true) + pop.should_not be_received + @queue.operations.size.should == 1 + end + end + + context "when there is a pop waiting" do + before do + @pop = @queue.pop(:deferred => true) + end + + it "should not be poppable" do + @queue.pop?.should == false + end + + it "should be pushable" do + @queue.push?.should == true + end + + it "should execute a push and the waiting pop immediately" do + push = @queue.push("1", :deferred => true) + @pop.should be_received + push.should be_sent + @pop.object.should == "1" + end + + it "should queue pops" do + @queue.operations.size.should == 1 + pop = @queue.pop(:deferred => true) + pop.should_not be_received + @queue.operations.size.should == 2 + end + end + + context "when there is a push waiting" do + before do + @push = @queue.push("1", :deferred => true) + end + + it "should be poppable" do + @queue.pop?.should == true + end + + it "should not be pushable" do + @queue.push?.should == false + end + + it "should queue pushes" do + @queue.operations.size.should == 1 + push = @queue.push("1", :deferred => true) + push.should_not be_sent + @queue.operations.size.should == 2 + end + + it "should execute a pop and the waiting push immediately" do + pop = @queue.pop(:deferred => true) + @push.should be_sent + pop.should be_received + pop.object.should == "1" + end + end + + context "when being closed" do + before do + @push1, @push2 = (1..2).map{ @queue.push("1", :deferred => true) } + end + + it "should go from open to closed" do + @queue.should_not be_closed + @queue.should be_open + @queue.close + @queue.should be_closed + @queue.should_not be_open + end + + it "should close all the waiting operations" do + @push1.should_not be_sent + @push1.should_not be_closed + @push2.should_not be_sent + @push2.should_not be_closed + + @queue.close + + @push1.should be_closed + @push2.should be_closed + end + + it "should clear all waiting operations" do + @queue.operations.size.should == 2 + @queue.pushes.size.should == 2 + @queue.close + @queue.operations.size.should == 0 + @queue.pushes.size.should == 0 + end + + it "should raise an error when being acted upon afterwards" do + @queue.close + lambda{ @queue.close }.should raise_error(Agent::Errors::ChannelClosed) + lambda{ @queue.push("1") }.should raise_error(Agent::Errors::ChannelClosed) + lambda{ @queue.pop }.should raise_error(Agent::Errors::ChannelClosed) + end + end + + context "when removing operations" do + before do + @pushes = (1..8).map{|i| @queue.push(i.to_s, :deferred => true) } + end + + it "should remove the operations" do + removable_pushes = @pushes.values_at(5, 6) # values "6" and "7" + @queue.remove_operations(removable_pushes) + while @queue.pop? + i = @queue.pop[0] + i.should_not be_nil + i.should_not == "6" + i.should_not == "7" + end + end + end + end end