require 'spec_helper' if Sidekiq::VERSION < '4' require 'celluloid' end require 'sidekiq/scheduled' describe "Sidekiq Integration" do describe "Happy Path" do before(:each) do Sidekiq::Testing.fake! QueueBus.dispatch("r1") do subscribe "event_name" do |attributes| QueueBus::Runner1.run(attributes) end end QueueBus::TaskManager.new(false).subscribe! end it "should publish and receive" do Sidekiq::Testing.fake! expect(QueueBus::Runner1.value).to eq(0) QueueBus.publish("event_name", "ok" => true) expect(QueueBus::Runner1.value).to eq(0) QueueBus::Worker.perform_one expect(QueueBus::Runner1.value).to eq(0) QueueBus::Worker.perform_one expect(QueueBus::Runner1.value).to eq(1) end it "should publish and receive" do Sidekiq::Testing.inline! expect(QueueBus::Runner1.value).to eq(0) QueueBus.publish("event_name", "ok" => true) expect(QueueBus::Runner1.value).to eq(1) end end describe "Delayed Publishing" do before(:each) do Timecop.freeze(now) allow(QueueBus).to receive(:generate_uuid).and_return("idfhlkj") end after(:each) do Timecop.return end let(:delayed_attrs) { {"bus_delayed_until" => future.to_i, "bus_id" => "#{now.to_i}-idfhlkj", "bus_app_hostname" => `hostname 2>&1`.strip.sub(/.local/,'')} } let(:bus_attrs) { delayed_attrs.merge({"bus_published_at" => worktime.to_i})} let(:now) { Time.parse("01/01/2013 5:00")} let(:future) { Time.at(now.to_i + 60) } let(:worktime) {Time.at(future.to_i + 1)} it "should add it to Redis" do hash = {:one => 1, "two" => "here", "id" => 12 } event_name = "event_name" QueueBus.publish_at(future, event_name, hash) val = QueueBus.redis { |redis| redis.zrange("schedule", 0, 1) }.first hash = JSON.parse(val) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "QueueBus::Publisher", "bus_event_type"=>"event_name", "two"=>"here", "one"=>1, "id" => 12}.merge(delayed_attrs)) expect(hash["queue"]).to eq("bus_incoming") end it "should move it to the real queue when processing" do hash = {:one => 1, "two" => "here", "id" => 12 } event_name = "event_name" val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") } expect(val).to eq(nil) QueueBus.publish_at(future, event_name, hash) val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") } expect(val).to eq(nil) # nothing really added if Sidekiq::VERSION < '4' Sidekiq::Scheduled::Poller.new.poll else Sidekiq::Scheduled::Poller.new.enqueue end val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") } expect(val).to eq(nil) # nothing added yet # process scheduler in future Timecop.freeze(worktime) do if Sidekiq::VERSION < '4' Sidekiq::Scheduled::Poller.new.poll else Sidekiq::Scheduled::Poller.new.enqueue end val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") } hash = JSON.parse(val) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "QueueBus::Publisher", "bus_event_type"=>"event_name", "two"=>"here", "one"=>1, "id" => 12}.merge(delayed_attrs)) QueueBus::Publisher.perform(JSON.parse(hash["args"].first)) val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") } hash = JSON.parse(val) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "QueueBus::Driver", "bus_event_type"=>"event_name", "two"=>"here", "one"=>1, "id" => 12}.merge(bus_attrs)) end end end end