spec/queue_poller_spec.rb in pheme-0.0.8 vs spec/queue_poller_spec.rb in pheme-0.0.9

- old
+ new

@@ -1,18 +1,7 @@ describe Pheme::QueuePoller do let(:queue_url) { "https://sqs.us-east-1.amazonaws.com/whatever" } - let(:poller) do - poller = double - allow(poller).to receive(:poll).with(kind_of(Hash)) - allow(poller).to receive(:parse_message) - allow(poller).to receive(:before_request) - poller - end - before(:each) do - use_default_configuration! - allow(Aws::SQS::QueuePoller).to receive(:new) { poller } - end describe ".new" do context "when initialized with valid params" do it "does not raise an error" do expect { ExampleQueuePoller.new(queue_url: "queue_url") }.not_to raise_error @@ -30,155 +19,208 @@ expect(ExampleQueuePoller.new(queue_url: "queue_url", max_messages: 5).max_messages).to eq(5) end end end - describe "#parse_message" do - context "with JSON message" do - subject { ExampleQueuePoller.new(queue_url: queue_url) } + let(:poller) { ExampleQueuePoller.new(queue_url: queue_url, format: format) } + let(:message_id) { SecureRandom.uuid } + let(:message) { nil } + let!(:queue_message) do + OpenStruct.new( + body: { Message: message }.to_json, + message_id: message_id, + ) + end - let!(:message) { OpenStruct.new({ - body: '{"Message":"{\"test\":\"test\"}"}' - })} + describe "#parse_body" do + subject { poller.parse_body(queue_message) } - it 'should parse the message correctly' do - expect(subject.parse_message(message).test).to eq("test") - end + context "message is JSON string" do + let(:format) { :json } + let!(:message) { { test: 'test' }.to_json } + its([:test]) { is_expected.to eq('test') } end - context "with CSV message" do - subject { ExampleQueuePoller.new(queue_url: queue_url, format: :csv) } - - let!(:message) { OpenStruct.new({ - body:'{"Message":"test,test2\nvalue,value2\nvalue3,value4"}' - })} - - it 'should parse the message correctly' do - expect(subject.parse_message(message)).to be_a(Array) - expect(subject.parse_message(message).count).to eq(2) + context "message is CSV string" do + let(:format) { :csv } + let(:expected_message) do + [ + { test1: 'value1', test2: 'value2' }, + { test1: 'value3', test2: 'value4' }, + ] end + let(:message) do + [ + %w[test1 test2].join(','), + %w[value1 value2].join(','), + %w[value3 value4].join(','), + ].join("\n") + end + + it { is_expected.to have(2).items } + it { is_expected.to eq(RecursiveOpenStruct.new({ wrapper: expected_message }, recurse_over_arrays: true).wrapper) } end context "with unknown message format" do - subject { ExampleQueuePoller.new(queue_url: queue_url, format: :invalid_format) } + let(:format) { :invalid_format } + let(:message) { 'unkonwn' } - let!(:message) { OpenStruct.new({ - body:'{"Message":"test,test2\nvalue,value2\nvalue3,value4"}' - })} - - it 'should raise an error' do - expect{ subject.parse_message(message) }.to raise_error + it "should raise error" do + expect{ subject }.to raise_error(ArgumentError) end end context "with array JSON message" do - subject { ExampleQueuePoller.new(queue_url: queue_url).parse_message(message) } - let!(:message) { OpenStruct.new({ - body: '{"Message":"[[{\"test\":\"test\"}]]"}' - })} - it 'should parse the message correctly' do - expect(subject.first.first.test).to eq("test") - expect(subject).to be_a Array - expect(subject.first).to be_a Array - expect(subject.first.first).to be_a RecursiveOpenStruct + let(:format) { :json } + let(:message) { [[{ test: 'test' }]].to_json } + + it { is_expected.to be_a(Array) } + its(:first) { is_expected.to be_a(Array) } + its('first.first') { is_expected.to be_a(RecursiveOpenStruct) } + it "parses the nested object" do + expect(subject.first.first.test).to eq('test') end end end describe "#poll" do before(:each) do module ActiveRecord class Base - def self.connection_pool - end + def self.connection_pool; end end end end context "with connection pool block" do let(:mock_connection_pool) { double } + subject { ExampleQueuePoller.new(queue_url: queue_url, connection_pool_block: true) } + let(:message) { { status: 'complete' } } + let(:notification) { { 'MessageId' => SecureRandom.uuid, 'Message' => message.to_json, 'Type' => 'Notification' } } + let!(:queue_message) do + OpenStruct.new( + body: notification.to_json, + message_id: message_id, + ) + end before(:each) do allow(ActiveRecord::Base).to receive(:connection_pool) { mock_connection_pool } allow(mock_connection_pool).to receive(:with_connection).and_yield + allow(subject.queue_poller).to receive(:poll).and_yield(queue_message) + allow(subject.queue_poller).to receive(:delete_message).with(queue_message) end - subject { ExampleQueuePoller.new(queue_url: queue_url, connection_pool_block: true) } - it "uses the connection pool block" do expect(mock_connection_pool).to receive(:with_connection) subject.poll end end context "without connection pool block" do subject { ExampleQueuePoller.new(queue_url: queue_url) } + let(:message) { { status: 'complete' } } + let(:notification) { { 'MessageId' => SecureRandom.uuid, 'Message' => message.to_json, 'Type' => 'Notification' } } + let!(:queue_message) do + OpenStruct.new( + body: notification.to_json, + message_id: message_id, + ) + end + before(:each) do + allow(subject.queue_poller).to receive(:poll).and_yield(queue_message) + allow(subject.queue_poller).to receive(:delete_message).with(queue_message) + end + it "does not call ActiveRecord" do expect(ActiveRecord::Base).not_to receive(:connection_pool) subject.poll end end context "when a valid message is yielded" do - let(:message_body) do + subject { ExampleQueuePoller.new(queue_url: queue_url) } + let(:message) { { id: "id-123", status: "complete" } } + let(:notification) do { - id: "id-123", - status: "complete", + 'MessageId' => SecureRandom.uuid, + 'Message' => message.to_json, + 'Type' => 'Notification', } end - let(:message) do - message = double - allow(message).to receive(:body) do - {Message: message_body.to_json,}.to_json - end - message + let!(:queue_message) do + OpenStruct.new( + body: notification.to_json, + message_id: message_id, + ) end + before(:each) do - allow(poller).to receive(:poll).and_yield(message) + allow(subject.queue_poller).to receive(:poll).and_yield(queue_message) + allow(subject.queue_poller).to receive(:delete_message).with(queue_message) end - subject { ExampleQueuePoller.new(queue_url: queue_url) } - it "handles the message" do - expect(ExampleMessageHandler).to receive(:new).with(message: RecursiveOpenStruct.new(message_body)) + expect(ExampleMessageHandler).to receive(:new).with(message: RecursiveOpenStruct.new(message)) subject.poll end it "deletes the message from the queue" do - expect(poller).to receive(:delete_message).with(message) + expect(subject.queue_poller).to receive(:delete_message).with(queue_message) subject.poll end end context "when an invalid message is yielded" do - let(:message_body) do + subject { ExampleQueuePoller.new(queue_url: queue_url) } + let(:message) { { id: "id-123", status: "unknown-abc" } } + let(:notification) do { - id: "id-123", - status: "unknown-abc", + 'MessageId' => SecureRandom.uuid, + 'Message' => message.to_json, + 'Type' => 'Notification', } end - let(:message) do - message = double - allow(message).to receive(:body) do - {Message: message_body.to_json}.to_json - end - message + let!(:queue_message) do + OpenStruct.new( + body: notification.to_json, + message_id: message_id, + ) end + before(:each) do - allow(poller).to receive(:poll).and_yield(message) - allow(Pheme).to receive(:log) + allow(subject.queue_poller).to receive(:poll).and_yield(queue_message) + allow(subject.queue_poller).to receive(:delete).with(queue_message) + allow(Pheme.logger).to receive(:error) end - subject { ExampleQueuePoller.new(queue_url: queue_url) } - it "logs the error" do subject.poll - expect(Pheme).to have_received(:log).with(:error, "Exception: #<ArgumentError: Unknown message status: unknown-abc>") + expect(Pheme.logger).to have_received(:error) do |error| + expect(error).to be_a(ArgumentError) + expect(error.message).to eq('Unknown message status: unknown-abc') + end end it "does not delete the message from the queue" do - expect(poller).not_to receive(:delete_message) + expect(subject.queue_poller).not_to receive(:delete_message) + subject.poll + end + end + + context "AWS-event message" do + subject { ExampleAwsEventQueuePoller.new(queue_url: queue_url) } + let(:queue_message) { OpenStruct.new(body: { 'Records' => records }.to_json) } + let(:records) do + [{ 'eventVersion' => '2.0', 'eventSource': 'aws:s3' }] + end + before(:each) do + allow(subject.queue_poller).to receive(:poll).and_yield(queue_message) + allow(subject.queue_poller).to receive(:delete).with(queue_message) + end + + it "logs the message" do subject.poll end end end end