spec/queue_poller_spec.rb in pheme-0.0.8 vs spec/queue_poller_spec.rb in pheme-0.0.9
- old
+ new
@@ -1,18 +1,7 @@
describe Pheme::QueuePoller do
let(:queue_url) { "https://sqs.us-east-1.amazonaws.com/whatever" }
- let(:poller) do
- poller = double
- allow(poller).to receive(:poll).with(kind_of(Hash))
- allow(poller).to receive(:parse_message)
- allow(poller).to receive(:before_request)
- poller
- end
- before(:each) do
- use_default_configuration!
- allow(Aws::SQS::QueuePoller).to receive(:new) { poller }
- end
describe ".new" do
context "when initialized with valid params" do
it "does not raise an error" do
expect { ExampleQueuePoller.new(queue_url: "queue_url") }.not_to raise_error
@@ -30,155 +19,208 @@
expect(ExampleQueuePoller.new(queue_url: "queue_url", max_messages: 5).max_messages).to eq(5)
end
end
end
- describe "#parse_message" do
- context "with JSON message" do
- subject { ExampleQueuePoller.new(queue_url: queue_url) }
+ let(:poller) { ExampleQueuePoller.new(queue_url: queue_url, format: format) }
+ let(:message_id) { SecureRandom.uuid }
+ let(:message) { nil }
+ let!(:queue_message) do
+ OpenStruct.new(
+ body: { Message: message }.to_json,
+ message_id: message_id,
+ )
+ end
- let!(:message) { OpenStruct.new({
- body: '{"Message":"{\"test\":\"test\"}"}'
- })}
+ describe "#parse_body" do
+ subject { poller.parse_body(queue_message) }
- it 'should parse the message correctly' do
- expect(subject.parse_message(message).test).to eq("test")
- end
+ context "message is JSON string" do
+ let(:format) { :json }
+ let!(:message) { { test: 'test' }.to_json }
+ its([:test]) { is_expected.to eq('test') }
end
- context "with CSV message" do
- subject { ExampleQueuePoller.new(queue_url: queue_url, format: :csv) }
-
- let!(:message) { OpenStruct.new({
- body:'{"Message":"test,test2\nvalue,value2\nvalue3,value4"}'
- })}
-
- it 'should parse the message correctly' do
- expect(subject.parse_message(message)).to be_a(Array)
- expect(subject.parse_message(message).count).to eq(2)
+ context "message is CSV string" do
+ let(:format) { :csv }
+ let(:expected_message) do
+ [
+ { test1: 'value1', test2: 'value2' },
+ { test1: 'value3', test2: 'value4' },
+ ]
end
+ let(:message) do
+ [
+ %w[test1 test2].join(','),
+ %w[value1 value2].join(','),
+ %w[value3 value4].join(','),
+ ].join("\n")
+ end
+
+ it { is_expected.to have(2).items }
+ it { is_expected.to eq(RecursiveOpenStruct.new({ wrapper: expected_message }, recurse_over_arrays: true).wrapper) }
end
context "with unknown message format" do
- subject { ExampleQueuePoller.new(queue_url: queue_url, format: :invalid_format) }
+ let(:format) { :invalid_format }
+ let(:message) { 'unkonwn' }
- let!(:message) { OpenStruct.new({
- body:'{"Message":"test,test2\nvalue,value2\nvalue3,value4"}'
- })}
-
- it 'should raise an error' do
- expect{ subject.parse_message(message) }.to raise_error
+ it "should raise error" do
+ expect{ subject }.to raise_error(ArgumentError)
end
end
context "with array JSON message" do
- subject { ExampleQueuePoller.new(queue_url: queue_url).parse_message(message) }
- let!(:message) { OpenStruct.new({
- body: '{"Message":"[[{\"test\":\"test\"}]]"}'
- })}
- it 'should parse the message correctly' do
- expect(subject.first.first.test).to eq("test")
- expect(subject).to be_a Array
- expect(subject.first).to be_a Array
- expect(subject.first.first).to be_a RecursiveOpenStruct
+ let(:format) { :json }
+ let(:message) { [[{ test: 'test' }]].to_json }
+
+ it { is_expected.to be_a(Array) }
+ its(:first) { is_expected.to be_a(Array) }
+ its('first.first') { is_expected.to be_a(RecursiveOpenStruct) }
+ it "parses the nested object" do
+ expect(subject.first.first.test).to eq('test')
end
end
end
describe "#poll" do
before(:each) do
module ActiveRecord
class Base
- def self.connection_pool
- end
+ def self.connection_pool; end
end
end
end
context "with connection pool block" do
let(:mock_connection_pool) { double }
+ subject { ExampleQueuePoller.new(queue_url: queue_url, connection_pool_block: true) }
+ let(:message) { { status: 'complete' } }
+ let(:notification) { { 'MessageId' => SecureRandom.uuid, 'Message' => message.to_json, 'Type' => 'Notification' } }
+ let!(:queue_message) do
+ OpenStruct.new(
+ body: notification.to_json,
+ message_id: message_id,
+ )
+ end
before(:each) do
allow(ActiveRecord::Base).to receive(:connection_pool) { mock_connection_pool }
allow(mock_connection_pool).to receive(:with_connection).and_yield
+ allow(subject.queue_poller).to receive(:poll).and_yield(queue_message)
+ allow(subject.queue_poller).to receive(:delete_message).with(queue_message)
end
- subject { ExampleQueuePoller.new(queue_url: queue_url, connection_pool_block: true) }
-
it "uses the connection pool block" do
expect(mock_connection_pool).to receive(:with_connection)
subject.poll
end
end
context "without connection pool block" do
subject { ExampleQueuePoller.new(queue_url: queue_url) }
+ let(:message) { { status: 'complete' } }
+ let(:notification) { { 'MessageId' => SecureRandom.uuid, 'Message' => message.to_json, 'Type' => 'Notification' } }
+ let!(:queue_message) do
+ OpenStruct.new(
+ body: notification.to_json,
+ message_id: message_id,
+ )
+ end
+ before(:each) do
+ allow(subject.queue_poller).to receive(:poll).and_yield(queue_message)
+ allow(subject.queue_poller).to receive(:delete_message).with(queue_message)
+ end
+
it "does not call ActiveRecord" do
expect(ActiveRecord::Base).not_to receive(:connection_pool)
subject.poll
end
end
context "when a valid message is yielded" do
- let(:message_body) do
+ subject { ExampleQueuePoller.new(queue_url: queue_url) }
+ let(:message) { { id: "id-123", status: "complete" } }
+ let(:notification) do
{
- id: "id-123",
- status: "complete",
+ 'MessageId' => SecureRandom.uuid,
+ 'Message' => message.to_json,
+ 'Type' => 'Notification',
}
end
- let(:message) do
- message = double
- allow(message).to receive(:body) do
- {Message: message_body.to_json,}.to_json
- end
- message
+ let!(:queue_message) do
+ OpenStruct.new(
+ body: notification.to_json,
+ message_id: message_id,
+ )
end
+
before(:each) do
- allow(poller).to receive(:poll).and_yield(message)
+ allow(subject.queue_poller).to receive(:poll).and_yield(queue_message)
+ allow(subject.queue_poller).to receive(:delete_message).with(queue_message)
end
- subject { ExampleQueuePoller.new(queue_url: queue_url) }
-
it "handles the message" do
- expect(ExampleMessageHandler).to receive(:new).with(message: RecursiveOpenStruct.new(message_body))
+ expect(ExampleMessageHandler).to receive(:new).with(message: RecursiveOpenStruct.new(message))
subject.poll
end
it "deletes the message from the queue" do
- expect(poller).to receive(:delete_message).with(message)
+ expect(subject.queue_poller).to receive(:delete_message).with(queue_message)
subject.poll
end
end
context "when an invalid message is yielded" do
- let(:message_body) do
+ subject { ExampleQueuePoller.new(queue_url: queue_url) }
+ let(:message) { { id: "id-123", status: "unknown-abc" } }
+ let(:notification) do
{
- id: "id-123",
- status: "unknown-abc",
+ 'MessageId' => SecureRandom.uuid,
+ 'Message' => message.to_json,
+ 'Type' => 'Notification',
}
end
- let(:message) do
- message = double
- allow(message).to receive(:body) do
- {Message: message_body.to_json}.to_json
- end
- message
+ let!(:queue_message) do
+ OpenStruct.new(
+ body: notification.to_json,
+ message_id: message_id,
+ )
end
+
before(:each) do
- allow(poller).to receive(:poll).and_yield(message)
- allow(Pheme).to receive(:log)
+ allow(subject.queue_poller).to receive(:poll).and_yield(queue_message)
+ allow(subject.queue_poller).to receive(:delete).with(queue_message)
+ allow(Pheme.logger).to receive(:error)
end
- subject { ExampleQueuePoller.new(queue_url: queue_url) }
-
it "logs the error" do
subject.poll
- expect(Pheme).to have_received(:log).with(:error, "Exception: #<ArgumentError: Unknown message status: unknown-abc>")
+ expect(Pheme.logger).to have_received(:error) do |error|
+ expect(error).to be_a(ArgumentError)
+ expect(error.message).to eq('Unknown message status: unknown-abc')
+ end
end
it "does not delete the message from the queue" do
- expect(poller).not_to receive(:delete_message)
+ expect(subject.queue_poller).not_to receive(:delete_message)
+ subject.poll
+ end
+ end
+
+ context "AWS-event message" do
+ subject { ExampleAwsEventQueuePoller.new(queue_url: queue_url) }
+ let(:queue_message) { OpenStruct.new(body: { 'Records' => records }.to_json) }
+ let(:records) do
+ [{ 'eventVersion' => '2.0', 'eventSource': 'aws:s3' }]
+ end
+ before(:each) do
+ allow(subject.queue_poller).to receive(:poll).and_yield(queue_message)
+ allow(subject.queue_poller).to receive(:delete).with(queue_message)
+ end
+
+ it "logs the message" do
subject.poll
end
end
end
end