require 'spec_helper' describe DispatchRider::QueueServices::AwsSqs do let(:visibility_timeout) { 100 } let(:fake_response) do AWS::SQS::Client.new.stub_for(:get_queue_url).tap { |response| response.data[:queue_url] = "the.queue.url" response.data[:attributes] = { "VisibilityTimeout" => visibility_timeout } } end before do AWS.config(stub_requests: true) allow_any_instance_of(AWS::SQS::Client).to receive(:client_request).and_return(fake_response) end subject(:aws_sqs_queue) do DispatchRider::QueueServices::AwsSqs.new(:name => "normal_priority") end describe "#assign_storage" do context "when the aws gem is installed" do context "when the name of the queue is passed in the options" do it "should return an instance representing the aws sqs queue" do aws_sqs_queue.assign_storage(:name => 'normal_priority') expect(aws_sqs_queue.queue.url).to eq('the.queue.url') end end context "when the url of the queue is passed in the options" do it "should return an instance representing the aws sqs queue" do aws_sqs_queue.assign_storage(:url => 'https://sqs.us-east-1.amazonaws.com/12345/QueueName') expect(aws_sqs_queue.queue.url).to eq('the.queue.url') end end context "when neither the name nor the url of the queue is assed in the options" do it "should raise an exception" do expect { aws_sqs_queue.assign_storage(:foo => 'bar') }.to raise_exception(DispatchRider::RecordInvalid) end end end end describe "#insert" do it "should insert an item into the queue" do obj = {'subject' => 'foo', 'body' => 'bar'}.to_json expect(aws_sqs_queue.queue).to receive(:send_message).with(obj) aws_sqs_queue.insert(obj) end end describe "#pop" do context "when the sqs queue has items in it" do let(:response_attributes) do { "SenderId" => "123456789012", "SentTimestamp" => Time.now.to_i.to_s, "ApproximateReceivedCount" => "12", "ApproximateFirstReceiveTimestamp" => (Time.now + 12).to_i.to_s, } end let(:response_message) do { :message_id => 12345, :md5_of_body => "mmmddd555", :body => {:subject => "foo", :body => {:bar => "baz"}}.to_json, :receipt_handle => "HANDLE", :attributes => response_attributes, } end before :each do response = AWS::SQS::Client.new.stub_for(:receive_message) response.data[:messages] = [response_message] AWS::SQS::Client::V20121105.any_instance.stub(:receive_message).and_return(response) AWS::SQS::Queue.any_instance.stub(:verify_receive_message_checksum).and_return([]) end context "when the block runs faster than the timeout" do it "should yield the first item in the queue" do aws_sqs_queue.pop do |message| expect(message.subject).to eq('foo') expect(message.body).to eq({'bar' => 'baz'}) end end end context "when the block runs slower than the timeout" do let(:visibility_timeout) { 1 } it "should raise" do expect { aws_sqs_queue.pop do |message| sleep(1.1) end }.to raise_exception(/message: foo,.+ took .+ seconds while the timeout was 1/) end end end context "when the sqs queue is empty" do before :each do aws_sqs_queue.queue.stub(:receive_message).and_return(nil) end it "should not yield" do expect { |b| aws_sqs_queue.pop(&b) }.not_to yield_control end end end describe "received message methods" do let(:response_attributes) {{ "SenderId" => "123456789012", "SentTimestamp" => Time.now.to_i.to_s, "ApproximateReceivedCount" => "12", "ApproximateFirstReceiveTimestamp" => (Time.now + 12).to_i.to_s, }} let(:response_message) { { :message_id => 12345, :md5_of_body => "mmmddd555", :body => {:subject => "foo", :body => {:bar => "baz"}}.to_json, :receipt_handle => "HANDLE", :attributes => response_attributes, } } before :each do response = AWS::SQS::Client.new.stub_for(:receive_message) response.data[:messages] = [response_message] AWS::SQS::Client::V20121105.any_instance.stub(:receive_message).and_return(response) AWS::SQS::Queue.any_instance.stub(:verify_receive_message_checksum).and_return([]) end it "should set the visibility timeout when extend is called" do expect_any_instance_of(AWS::SQS::ReceivedMessage).to receive(:visibility_timeout=).with(10) expect_any_instance_of(AWS::SQS::ReceivedMessage).to receive(:visibility_timeout=).with(0) aws_sqs_queue.pop do |message| message.extend_timeout(10) message.total_timeout.should eq(10) message.return_to_queue message.total_timeout.should eq(10) end end end describe "#construct_message_from" do context "when the item is directly published to AWS::SQS" do let(:sqs_message){ OpenStruct.new(:body => {'subject' => 'foo', 'body' => 'bar'}.to_json) } it "should return a message" do result = aws_sqs_queue.construct_message_from(sqs_message) result.subject.should eq('foo') result.body.should eq('bar') end end context "when the item is published through AWS::SNS" do let(:sqs_message){ OpenStruct.new(:body => {"Type" => "Notification", "Message" => {'subject' => 'foo', 'body' => 'bar'}.to_json}.to_json) } it "should return a message" do result = aws_sqs_queue.construct_message_from(sqs_message) result.subject.should eq('foo') result.body.should eq('bar') end end end describe "#delete" do let(:item_in_queue){ Object.new } it "should delete the first message from the queue" do item_in_queue.should_receive(:delete) aws_sqs_queue.delete(item_in_queue) end end describe "#size" do it "should return the size of the aws queue" do aws_sqs_queue.queue.should_receive(:approximate_number_of_messages) aws_sqs_queue.size end end end