require 'spec_helper' class TestWorker < BatchTestJobBase def perform end end RSpec.describe CanvasSync::JobBatches::Batch do describe '#initialize' do subject { described_class } it 'creates bid when called without it' do expect(subject.new.bid).not_to be_nil end it 'reuses bid when called with it' do batch = subject.new('dayPO5KxuRXXxw') expect(batch.bid).to eq('dayPO5KxuRXXxw') end end describe '#description' do let(:description) { 'custom description' } before do subject.description = description subject.jobs { } end it 'sets descriptions' do expect(subject.description).to eq(description) end it 'persists description' do expect(CanvasSync::JobBatches::Batch.redis { |r| r.hget("BID-#{subject.bid}", 'description') }) .to eq(description) end end describe '#callback_queue' do let(:callback_queue) { 'custom_queue' } before do subject.callback_queue = callback_queue subject.jobs { } end it 'sets callback_queue' do expect(subject.callback_queue).to eq(callback_queue) end it 'persists callback_queue' do expect(CanvasSync::JobBatches::Batch.redis { |r| r.hget("BID-#{subject.bid}", 'callback_queue') }) .to eq(callback_queue) end end describe '#jobs' do it 'throws error if no block given' do expect { subject.jobs }.to raise_error CanvasSync::JobBatches::Batch::NoBlockGivenError end it 'increments to_process (when started)' it 'decrements to_process (when finished)' # it 'calls process_successful_job to wait for block to finish' do # batch = CanvasSync::JobBatches::Batch.new # expect(CanvasSync::JobBatches::Batch).to receive(:process_successful_job).with(batch.bid) # batch.jobs {} # end it 'sets Thread.current bid' do batch = CanvasSync::JobBatches::Batch.new batch.jobs do expect(Thread.current[:batch]).to eq(batch) end end end describe '#invalidate_all' do class InvalidatableJob < BatchTestJobBase def perform return unless valid_within_batch? was_performed end def was_performed; end end it 'marks batch in redis as invalidated' do batch = CanvasSync::JobBatches::Batch.new job = InvalidatableJob.new allow(job).to receive(:was_performed) batch.invalidate_all batch.jobs { job.perform } expect(job).not_to have_received(:was_performed) end context 'nested batches' do let(:batch_parent) { CanvasSync::JobBatches::Batch.new } let(:batch_child_1) { CanvasSync::JobBatches::Batch.new } let(:batch_child_2) { CanvasSync::JobBatches::Batch.new } let(:job_of_parent) { InvalidatableJob.new } let(:job_of_child_1) { InvalidatableJob.new } let(:job_of_child_2) { InvalidatableJob.new } before do allow(job_of_parent).to receive(:was_performed) allow(job_of_child_1).to receive(:was_performed) allow(job_of_child_2).to receive(:was_performed) end it 'invalidates all job if parent batch is marked as invalidated' do batch_parent.invalidate_all batch_parent.jobs do [ job_of_parent.perform, batch_child_1.jobs do [ job_of_child_1.perform, batch_child_2.jobs { job_of_child_2.perform } ] end ] end expect(job_of_parent).not_to have_received(:was_performed) expect(job_of_child_1).not_to have_received(:was_performed) expect(job_of_child_2).not_to have_received(:was_performed) end it 'invalidates only requested batch' do batch_child_2.invalidate_all batch_parent.jobs do [ job_of_parent.perform, batch_child_1.jobs do [ job_of_child_1.perform, batch_child_2.jobs { job_of_child_2.perform } ] end ] end expect(job_of_parent).to have_received(:was_performed) expect(job_of_child_1).to have_received(:was_performed) expect(job_of_child_2).not_to have_received(:was_performed) end end end describe '#process_failed_job' do let(:batch) { CanvasSync::JobBatches::Batch.new } let(:bid) { batch.bid } let(:jid) { 'ABCD' } before { CanvasSync::JobBatches::Batch.redis { |r| r.hset("BID-#{bid}", 'pending', 1) } } context 'complete' do let(:failed_jid) { 'xxx' } it 'tries to call complete callback' do expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:complete, bid) CanvasSync::JobBatches::Batch.process_failed_job(bid, failed_jid) end it 'add job to failed list' do CanvasSync::JobBatches::Batch.process_failed_job(bid, 'failed-job-id') CanvasSync::JobBatches::Batch.process_failed_job(bid, failed_jid) failed = CanvasSync::JobBatches::Batch.redis { |r| r.smembers("BID-#{bid}-failed") } expect(failed).to eq(['xxx', 'failed-job-id']) end end end describe '#process_successful_job' do let(:batch) { CanvasSync::JobBatches::Batch.new } let(:bid) { batch.bid } let(:jid) { 'ABCD' } before { CanvasSync::JobBatches::Batch.redis { |r| r.hset("BID-#{bid}", 'pending', 1) } } context 'complete' do before { batch.on(:complete, Object) } # before { batch.increment_job_queue(bid) } # before { batch.jobs do TestWorker.perform_async end } # before { CanvasSync::JobBatches::Batch.process_failed_job(bid, 'failed-job-id') } it 'tries to call complete callback' do expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:complete, bid) CanvasSync::JobBatches::Batch.process_failed_job(bid, 'failed-job-id') end end context 'success' do before { batch.on(:complete, Object) } it 'tries to call complete callback' do expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:complete, bid).ordered expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:success, bid).ordered CanvasSync::JobBatches::Batch.process_successful_job(bid, jid) end it 'tries to call success callback after a previous failure' do expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:complete, bid).ordered CanvasSync::JobBatches::Batch.process_failed_job(bid, jid) expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:complete, bid).ordered expect(CanvasSync::JobBatches::Batch).to receive(:enqueue_callbacks).with(:success, bid).ordered CanvasSync::JobBatches::Batch.process_successful_job(bid, jid) end it 'triggers callbacks as expected' do ActiveJob::Base.queue_adapter = :sidekiq CanvasSync::JobBatches::Batch::Callback.worker_class = CanvasSync::JobBatches::Sidekiq::SidekiqCallbackWorker callback_instance = double('SampleCallback') expect(SampleCallback).to receive(:new).at_least(1).times.and_return(callback_instance) expect(callback_instance).to receive(:on_complete) expect(callback_instance).to receive(:on_success) batch.on(:complete, SampleCallback) batch.on(:success, SampleCallback) Sidekiq::Testing.inline! do CanvasSync::JobBatches::Batch.process_failed_job(bid, jid) CanvasSync::JobBatches::Batch.process_successful_job(bid, jid) end end it 'delays triggering callbacks if keep_open is set' do ActiveJob::Base.queue_adapter = :sidekiq CanvasSync::JobBatches::Batch::Callback.worker_class = CanvasSync::JobBatches::Sidekiq::SidekiqCallbackWorker callback_instance = double('SampleCallback') expect(SampleCallback).to receive(:new).at_least(1).times.and_return(callback_instance) expect(callback_instance).not_to receive(:on_complete) expect(callback_instance).not_to receive(:on_success) batch.on(:complete, SampleCallback) batch.on(:success, SampleCallback) batch.keep_open! Sidekiq::Testing.inline! do CanvasSync::JobBatches::Batch.process_failed_job(bid, jid) CanvasSync::JobBatches::Batch.process_successful_job(bid, jid) end RSpec::Mocks.space.proxy_for(callback_instance).reset expect(callback_instance).to receive(:on_complete) expect(callback_instance).to receive(:on_success) Sidekiq::Testing.inline! do batch.let_close! end end it 'triggers callbacks as expected' do ActiveJob::Base.queue_adapter = :sidekiq CanvasSync::JobBatches::Batch.redis { |r| r.hset("BID-#{bid}", 'pending', 0) } class RetryingJob < BatchTestJobBase @@failed = false def perform unless @@failed @@failed = true raise "A Failure" end end end callback_instance = double('SampleCallback') expect(SampleCallback).to receive(:new).at_least(1).times.and_return(callback_instance) expect(callback_instance).to receive(:on_complete) expect(callback_instance).to receive(:on_success) batch.on(:complete, SampleCallback) batch.on(:success, SampleCallback) batch.jobs do RetryingJob.perform_later end job_def = Sidekiq::Worker.jobs[0] int_job_class = job_def["class"].constantize begin int_job_class.process_job(job_def) rescue end Sidekiq::Worker.drain_all end it 'cleanups redis key' do CanvasSync::JobBatches::Batch.process_successful_job(bid, jid) expect(CanvasSync::JobBatches::Batch.redis { |r| r.get("BID-#{bid}-pending") }.to_i).to eq(0) end end end describe '#increment_job_queue' do let(:batch) { CanvasSync::JobBatches::Batch.new } it 'increments pending' do batch.jobs do TestWorker.perform_async end pending = CanvasSync::JobBatches::Batch.redis { |r| r.hget("BID-#{batch.bid}", 'pending') } expect(pending).to eq('1') end end describe '#enqueue_callbacks' do let(:callback) { double('callback') } let(:event) { :complete } context 'on :success' do let(:event) { :success } context 'when no callbacks are defined' do it 'clears redis keys' do batch = CanvasSync::JobBatches::Batch.new batch.jobs {} expect(CanvasSync::JobBatches::Batch).to receive(:cleanup_redis).with(batch.bid) CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) end end end context 'when already called' do it 'returns and does not enqueue callbacks' do batch = CanvasSync::JobBatches::Batch.new batch.on(event, SampleCallback) CanvasSync::JobBatches::Batch.redis { |r| r.hset("BID-#{batch.bid}", event, true) } expect(batch).not_to receive(:push_callbacks) CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) end end context 'With ActiveJob Adapter' do around(:all) do |block| CanvasSync::JobBatches::Batch::Callback.worker_class = CanvasSync::JobBatches::Batch::Callback::ActiveJobCallbackWorker block.run end context 'when not yet called' do context 'when there is no callback' do it 'it returns' do batch = CanvasSync::JobBatches::Batch.new expect(batch).not_to receive(:push_callbacks) CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) end end context 'when callback defined' do let(:opts) { { 'a' => 'b' } } it 'calls it passing options' do ActiveJob::Base.queue_adapter = :test batch = CanvasSync::JobBatches::Batch.new batch.on(event, SampleCallback, opts) batch.jobs {} CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) expect(CanvasSync::JobBatches::Batch::Callback.worker_class).to have_been_enqueued.with( 'SampleCallback', event.to_s, opts, batch.bid, nil ) end end context 'when multiple callbacks are defined' do let(:opts) { { 'a' => 'b' } } let(:opts2) { { 'b' => 'a' } } it 'enqueues each callback passing their options' do ActiveJob::Base.queue_adapter = :test batch = CanvasSync::JobBatches::Batch.new batch.on(event, SampleCallback, opts) batch.on(event, SampleCallback2, opts2) batch.jobs{} CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) expect(CanvasSync::JobBatches::Batch::Callback.worker_class).to have_been_enqueued.with( 'SampleCallback2', event.to_s, opts2, batch.bid, nil ) expect(CanvasSync::JobBatches::Batch::Callback.worker_class).to have_been_enqueued.with( 'SampleCallback', event.to_s, opts, batch.bid, nil ) end end end end context 'With Sidekiq Adapter' do around(:all) do |block| CanvasSync::JobBatches::Batch::Callback.worker_class = CanvasSync::JobBatches::Sidekiq::SidekiqCallbackWorker block.run end context 'when not yet called' do context 'when there is no callback' do it 'it returns' do batch = CanvasSync::JobBatches::Batch.new expect(batch).not_to receive(:push_callbacks) CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) end end context 'when callback defined' do let(:opts) { { 'a' => 'b' } } it 'calls it passing options' do batch = CanvasSync::JobBatches::Batch.new batch.on(event, SampleCallback, opts) batch.jobs{} expect(Sidekiq::Client).to receive(:push_bulk).with( 'class' => Sidekiq::Batch::Callback.worker_class, 'args' => [['SampleCallback', event.to_s, opts, batch.bid, nil]], 'queue' => 'default' ) CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) end end context 'when multiple callbacks are defined' do let(:opts) { { 'a' => 'b' } } let(:opts2) { { 'b' => 'a' } } it 'enqueues each callback passing their options' do batch = CanvasSync::JobBatches::Batch.new batch.on(event, SampleCallback, opts) batch.on(event, SampleCallback2, opts2) batch.jobs{} expect(Sidekiq::Client).to receive(:push_bulk).with( 'class' => Sidekiq::Batch::Callback.worker_class, 'args' => [ ['SampleCallback2', event.to_s, opts2, batch.bid, nil], ['SampleCallback', event.to_s, opts, batch.bid, nil] ], 'queue' => 'default' ) CanvasSync::JobBatches::Batch.enqueue_callbacks(event, batch.bid) end end end end end end