require 'spec_helper' describe QueueBus::Subscriber do let(:attributes) { {"x" => "y"} } let(:bus_attrs) { {"bus_driven_at" => Time.now.to_i} } before(:each) do class SubscriberTest1 include QueueBus::Subscriber @queue = "myqueue" application :my_thing subscribe :thing_filter, :x => "y" subscribe :event_sub def event_sub(attributes) QueueBus::Runner1.run(attributes) end def thing_filter(attributes) QueueBus::Runner2.run(attributes) end end class SubscriberTest2 include QueueBus::Subscriber application :test2 subscribe :test2, "value" => :present transform :make_an_int def self.make_an_int(attributes) attributes["value"].to_s.length end def test2(int) QueueBus::Runner1.run("transformed"=>int) end end module SubModule class SubscriberTest3 include QueueBus::Subscriber subscribe_queue :sub_queue1, :test3, :bus_event_type => "the_event" subscribe_queue :sub_queue2, :the_event subscribe :other, :bus_event_type => "other_event" def test3(attributes) QueueBus::Runner1.run(attributes) end def the_event(attributes) QueueBus::Runner2.run(attributes) end end class SubscriberTest4 include QueueBus::Subscriber subscribe_queue :sub_queue1, :test4 end end Timecop.freeze QueueBus::TaskManager.new(false).subscribe! end after(:each) do Timecop.return end it "should have the application" do expect(SubscriberTest1.app_key).to eq("my_thing") expect(SubModule::SubscriberTest3.app_key).to eq("sub_module") expect(SubModule::SubscriberTest4.app_key).to eq("sub_module") end it "should be able to transform the attributes" do dispatcher = QueueBus.dispatcher_by_key("test2") all = dispatcher.subscriptions.all expect(all.size).to eq(1) sub = all.first expect(sub.queue_name).to eq("test2_default") expect(sub.class_name).to eq("SubscriberTest2") expect(sub.key).to eq("SubscriberTest2.test2") expect(sub.matcher.filters).to eq({"value"=>"bus_special_value_present"}) QueueBus::Driver.perform(attributes.merge("bus_event_type" => "something2", "value"=>"nice")) hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:test2_default") }) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "SubscriberTest2", "bus_rider_app_key"=>"test2", "bus_rider_sub_key"=>"SubscriberTest2.test2", "bus_rider_queue" => "test2_default", "bus_rider_class_name"=>"SubscriberTest2", "bus_event_type" => "something2", "value"=>"nice", "x"=>"y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(0) expect(QueueBus::Runner2.value).to eq(0) QueueBus::Util.constantize(hash["class"]).perform(*hash["args"]) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(0) expect(QueueBus::Runner1.attributes).to eq({"transformed" => 4}) QueueBus::Driver.perform(attributes.merge("bus_event_type" => "something2", "value"=>"12")) hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:test2_default") }) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "SubscriberTest2", "bus_rider_app_key"=>"test2", "bus_rider_sub_key"=>"SubscriberTest2.test2", "bus_rider_queue" => "test2_default", "bus_rider_class_name"=>"SubscriberTest2", "bus_event_type" => "something2", "value"=>"12", "x"=>"y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(0) QueueBus::Util.constantize(hash["class"]).perform(*hash["args"]) expect(QueueBus::Runner1.value).to eq(2) expect(QueueBus::Runner2.value).to eq(0) expect(QueueBus::Runner1.attributes).to eq({"transformed" => 2}) end it "should put in a different queue" do dispatcher = QueueBus.dispatcher_by_key("sub_module") all = dispatcher.subscriptions.all expect(all.size).to eq(4) sub = all.select{ |s| s.key == "SubModule::SubscriberTest3.test3"}.first expect(sub.queue_name).to eq("sub_queue1") expect(sub.class_name).to eq("SubModule::SubscriberTest3") expect(sub.key).to eq("SubModule::SubscriberTest3.test3") expect(sub.matcher.filters).to eq({"bus_event_type"=>"the_event"}) sub = all.select{ |s| s.key == "SubModule::SubscriberTest3.the_event"}.first expect(sub.queue_name).to eq("sub_queue2") expect(sub.class_name).to eq("SubModule::SubscriberTest3") expect(sub.key).to eq("SubModule::SubscriberTest3.the_event") expect(sub.matcher.filters).to eq({"bus_event_type"=>"the_event"}) sub = all.select{ |s| s.key == "SubModule::SubscriberTest3.other"}.first expect(sub.queue_name).to eq("sub_module_default") expect(sub.class_name).to eq("SubModule::SubscriberTest3") expect(sub.key).to eq("SubModule::SubscriberTest3.other") expect(sub.matcher.filters).to eq({"bus_event_type"=>"other_event"}) sub = all.select{ |s| s.key == "SubModule::SubscriberTest4.test4"}.first expect(sub.queue_name).to eq("sub_queue1") expect(sub.class_name).to eq("SubModule::SubscriberTest4") expect(sub.key).to eq("SubModule::SubscriberTest4.test4") expect(sub.matcher.filters).to eq({"bus_event_type"=>"test4"}) QueueBus::Driver.perform(attributes.merge("bus_event_type" => "the_event")) hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:sub_queue1") }) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "SubModule::SubscriberTest3", "bus_rider_app_key"=>"sub_module", "bus_rider_sub_key"=>"SubModule::SubscriberTest3.test3", "bus_rider_queue" => "sub_queue1", "bus_rider_class_name"=>"SubModule::SubscriberTest3", "bus_event_type" => "the_event", "x" => "y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(0) expect(QueueBus::Runner2.value).to eq(0) QueueBus::Util.constantize(hash["class"]).perform(*hash["args"]) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(0) hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:sub_queue2") }) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "SubModule::SubscriberTest3", "bus_rider_app_key"=>"sub_module", "bus_rider_sub_key"=>"SubModule::SubscriberTest3.the_event", "bus_rider_queue" => "sub_queue2", "bus_rider_class_name"=>"SubModule::SubscriberTest3", "bus_event_type" => "the_event", "x" => "y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(0) QueueBus::Util.constantize(hash["class"]).perform(*hash["args"]) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(1) end it "should subscribe to default and attributes" do dispatcher = QueueBus.dispatcher_by_key("my_thing") all = dispatcher.subscriptions.all sub = all.select{ |s| s.key == "SubscriberTest1.event_sub"}.first expect(sub.queue_name).to eq("myqueue") expect(sub.class_name).to eq("SubscriberTest1") expect(sub.key).to eq("SubscriberTest1.event_sub") expect(sub.matcher.filters).to eq({"bus_event_type"=>"event_sub"}) sub = all.select{ |s| s.key == "SubscriberTest1.thing_filter"}.first expect(sub.queue_name).to eq("myqueue") expect(sub.class_name).to eq("SubscriberTest1") expect(sub.key).to eq("SubscriberTest1.thing_filter") expect(sub.matcher.filters).to eq({"x"=>"y"}) QueueBus::Driver.perform(attributes.merge("bus_event_type" => "event_sub")) expect(QueueBus.redis { |redis| redis.smembers("queues") }).to match_array(["myqueue"]) pop1 = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:myqueue") }) pop2 = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:myqueue") }) if JSON.parse(pop1["args"].first)["bus_rider_sub_key"] == "SubscriberTest1.thing_filter" hash1 = pop1 hash2 = pop2 else hash1 = pop2 hash2 = pop1 end expect(hash1["class"]).to eq("QueueBus::Worker") expect(JSON.parse(hash1["args"].first)).to eq({"bus_class_proxy" => "SubscriberTest1", "bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.thing_filter", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1", "bus_event_type" => "event_sub", "x" => "y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(0) expect(QueueBus::Runner2.value).to eq(0) QueueBus::Util.constantize(hash1["class"]).perform(*hash1["args"]) expect(QueueBus::Runner1.value).to eq(0) expect(QueueBus::Runner2.value).to eq(1) expect(hash2["class"]).to eq("QueueBus::Worker") expect(hash2["args"].size).to eq(1) expect(JSON.parse(hash2["args"].first)).to eq({"bus_class_proxy" => "SubscriberTest1", "bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.event_sub", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1", "bus_event_type" => "event_sub", "x" => "y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(0) expect(QueueBus::Runner2.value).to eq(1) QueueBus::Util.constantize(hash2["class"]).perform(*hash2["args"]) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(1) QueueBus::Driver.perform(attributes.merge("bus_event_type" => "event_sub_other")) expect(QueueBus.redis { |redis| redis.smembers("queues") }).to match_array(["myqueue"]) hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:myqueue") }) expect(hash["class"]).to eq("QueueBus::Worker") expect(hash["args"].size).to eq(1) expect(JSON.parse(hash["args"].first)).to eq({"bus_class_proxy" => "SubscriberTest1", "bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.thing_filter", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1", "bus_event_type" => "event_sub_other", "x" => "y"}.merge(bus_attrs)) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(1) QueueBus::Util.constantize(hash["class"]).perform(*hash["args"]) expect(QueueBus::Runner1.value).to eq(1) expect(QueueBus::Runner2.value).to eq(2) QueueBus::Driver.perform({"x"=>"z"}.merge("bus_event_type" => "event_sub_other")) expect(QueueBus.redis { |redis| redis.smembers("queues") }).to match_array(["myqueue"]) expect(QueueBus.redis { |redis| redis.lpop("queue:myqueue") }).to be_nil end describe ".perform" do let(:attributes) { {"bus_rider_sub_key"=>"SubscriberTest1.event_sub", "bus_locale" => "en", "bus_timezone" => "PST"} } it "should call the method based on key" do expect_any_instance_of(SubscriberTest1).to receive(:event_sub) SubscriberTest1.perform(attributes) end it "should set the timezone and locale if present" do expect(defined?(I18n)).to be_nil expect(Time.respond_to?(:zone)).to eq(false) stub_const("I18n", Class.new) expect(I18n).to receive(:locale=).with("en") expect(Time).to receive(:zone=).with("PST") expect_any_instance_of(SubscriberTest1).to receive(:event_sub) SubscriberTest1.perform(attributes) end end end