spec/subscriber_spec.rb in resque-bus-0.3.7 vs spec/subscriber_spec.rb in resque-bus-0.5.7
- old
+ new
@@ -1,252 +1,270 @@
require 'spec_helper'
-class SubscriberTest1
- include ResqueBus::Subscriber
- @queue = "myqueue"
-
- application :my_thing
- subscribe :thing_filter, :x => "y"
- subscribe :event_sub
-
- def event_sub(attributes)
- ResqueBus::Runner1.run(attributes)
- end
-
- def thing_filter(attributes)
- ResqueBus::Runner2.run(attributes)
- end
-end
+ describe QueueBus::Subscriber do
+ let(:attributes) { {"x" => "y"} }
+ let(:bus_attrs) { {"bus_driven_at" => Time.now.to_i} }
-class SubscriberTest2
- include ResqueBus::Subscriber
- application :test2
- subscribe :test2, "value" => :present
- transform :make_an_int
-
- def self.make_an_int(attributes)
- attributes["value"].to_s.length
- end
-
- def test2(int)
- ResqueBus::Runner1.run("transformed"=>int)
- end
-end
+ before(:each) do
+ class SubscriberTest1
+ include QueueBus::Subscriber
+ @queue = "myqueue"
-module SubModule
- class SubscriberTest3
- include ResqueBus::Subscriber
-
- subscribe_queue :sub_queue1, :test3, :bus_event_type => "the_event"
- subscribe_queue :sub_queue2, :the_event
- subscribe :other, :bus_event_type => "other_event"
-
- def test3(attributes)
- ResqueBus::Runner1.run(attributes)
+ application :my_thing
+ subscribe :thing_filter, :x => "y"
+ subscribe :event_sub
+
+ def event_sub(attributes)
+ QueueBus::Runner1.run(attributes)
+ end
+
+ def thing_filter(attributes)
+ QueueBus::Runner2.run(attributes)
+ end
+ end
+
+ class SubscriberTest2
+ include QueueBus::Subscriber
+ application :test2
+ subscribe :test2, "value" => :present
+ transform :make_an_int
+
+ def self.make_an_int(attributes)
+ attributes["value"].to_s.length
+ end
+
+ def test2(int)
+ QueueBus::Runner1.run("transformed"=>int)
+ end
+ end
+
+ module SubModule
+ class SubscriberTest3
+ include QueueBus::Subscriber
+
+ subscribe_queue :sub_queue1, :test3, :bus_event_type => "the_event"
+ subscribe_queue :sub_queue2, :the_event
+ subscribe :other, :bus_event_type => "other_event"
+
+ def test3(attributes)
+ QueueBus::Runner1.run(attributes)
+ end
+
+ def the_event(attributes)
+ QueueBus::Runner2.run(attributes)
+ end
+ end
+
+ class SubscriberTest4
+ include QueueBus::Subscriber
+
+ subscribe_queue :sub_queue1, :test4
+ end
+ end
+
+ Timecop.freeze
+ QueueBus::TaskManager.new(false).subscribe!
end
-
- def the_event(attributes)
- ResqueBus::Runner2.run(attributes)
- end
- end
-
- class SubscriberTest4
- include ResqueBus::Subscriber
-
- subscribe_queue :sub_queue1, :test4
- end
-end
-module ResqueBus
- describe Subscriber do
- let(:attributes) { {"x" => "y"} }
- let(:bus_attrs) { {"bus_driven_at" => Time.now.to_i} }
-
- before(:each) do
- ResqueBus::TaskManager.new(false).subscribe!
+ after(:each) do
+ Timecop.return
end
-
+
it "should have the application" do
SubscriberTest1.app_key.should == "my_thing"
SubModule::SubscriberTest3.app_key.should == "sub_module"
SubModule::SubscriberTest4.app_key.should == "sub_module"
end
-
+
it "should be able to transform the attributes" do
- dispatcher = ResqueBus.dispatcher_by_key("test2")
+ dispatcher = QueueBus.dispatcher_by_key("test2")
all = dispatcher.subscriptions.all
all.size.should == 1
sub = all.first
sub.queue_name.should == "test2_default"
sub.class_name.should == "SubscriberTest2"
sub.key.should == "SubscriberTest2.test2"
sub.matcher.filters.should == {"value"=>"bus_special_value_present"}
- Driver.perform(attributes.merge("bus_event_type" => "something2", "value"=>"nice"))
+ QueueBus::Driver.perform(attributes.merge("bus_event_type" => "something2", "value"=>"nice"))
- hash = JSON.parse(ResqueBus.redis.lpop("queue:test2_default"))
- hash["class"].should == "SubscriberTest2"
- hash["args"].should == [ {"bus_rider_app_key"=>"test2", "bus_rider_sub_key"=>"SubscriberTest2.test2", "bus_rider_queue" => "test2_default", "bus_rider_class_name"=>"SubscriberTest2",
- "bus_event_type" => "something2", "value"=>"nice", "x"=>"y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 0
- Runner2.value.should == 0
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 1
- Runner2.value.should == 0
-
- Runner1.attributes.should == {"transformed" => 4}
-
-
- Driver.perform(attributes.merge("bus_event_type" => "something2", "value"=>"12"))
-
- hash = JSON.parse(ResqueBus.redis.lpop("queue:test2_default"))
- hash["class"].should == "SubscriberTest2"
- hash["args"].should == [ {"bus_rider_app_key"=>"test2", "bus_rider_sub_key"=>"SubscriberTest2.test2", "bus_rider_queue" => "test2_default", "bus_rider_class_name"=>"SubscriberTest2",
- "bus_event_type" => "something2", "value"=>"12", "x"=>"y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 1
- Runner2.value.should == 0
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 2
- Runner2.value.should == 0
-
- Runner1.attributes.should == {"transformed" => 2}
+ hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:test2_default") })
+ hash["class"].should == "QueueBus::Worker"
+ hash["args"].size.should == 1
+ JSON.parse(hash["args"].first).should eq({"bus_class_proxy" => "SubscriberTest2", "bus_rider_app_key"=>"test2", "bus_rider_sub_key"=>"SubscriberTest2.test2", "bus_rider_queue" => "test2_default", "bus_rider_class_name"=>"SubscriberTest2",
+ "bus_event_type" => "something2", "value"=>"nice", "x"=>"y"}.merge(bus_attrs))
+
+ QueueBus::Runner1.value.should == 0
+ QueueBus::Runner2.value.should == 0
+ QueueBus::Util.constantize(hash["class"]).perform(*hash["args"])
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 0
+
+ QueueBus::Runner1.attributes.should == {"transformed" => 4}
+
+
+ QueueBus::Driver.perform(attributes.merge("bus_event_type" => "something2", "value"=>"12"))
+
+ hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:test2_default") })
+ hash["class"].should == "QueueBus::Worker"
+ hash["args"].size.should == 1
+ JSON.parse(hash["args"].first).should == {"bus_class_proxy" => "SubscriberTest2", "bus_rider_app_key"=>"test2", "bus_rider_sub_key"=>"SubscriberTest2.test2", "bus_rider_queue" => "test2_default", "bus_rider_class_name"=>"SubscriberTest2",
+ "bus_event_type" => "something2", "value"=>"12", "x"=>"y"}.merge(bus_attrs)
+
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 0
+ QueueBus::Util.constantize(hash["class"]).perform(*hash["args"])
+ QueueBus::Runner1.value.should == 2
+ QueueBus::Runner2.value.should == 0
+
+ QueueBus::Runner1.attributes.should == {"transformed" => 2}
end
-
-
+
+
it "should put in a different queue" do
- dispatcher = ResqueBus.dispatcher_by_key("sub_module")
+ dispatcher = QueueBus.dispatcher_by_key("sub_module")
all = dispatcher.subscriptions.all
all.size.should == 4
-
+
sub = all.select{ |s| s.key == "SubModule::SubscriberTest3.test3"}.first
sub.queue_name.should == "sub_queue1"
sub.class_name.should == "SubModule::SubscriberTest3"
sub.key.should == "SubModule::SubscriberTest3.test3"
sub.matcher.filters.should == {"bus_event_type"=>"the_event"}
-
+
sub = all.select{ |s| s.key == "SubModule::SubscriberTest3.the_event"}.first
sub.queue_name.should == "sub_queue2"
sub.class_name.should == "SubModule::SubscriberTest3"
sub.key.should == "SubModule::SubscriberTest3.the_event"
sub.matcher.filters.should == {"bus_event_type"=>"the_event"}
-
+
sub = all.select{ |s| s.key == "SubModule::SubscriberTest3.other"}.first
sub.queue_name.should == "sub_module_default"
sub.class_name.should == "SubModule::SubscriberTest3"
sub.key.should == "SubModule::SubscriberTest3.other"
sub.matcher.filters.should == {"bus_event_type"=>"other_event"}
-
+
sub = all.select{ |s| s.key == "SubModule::SubscriberTest4.test4"}.first
sub.queue_name.should == "sub_queue1"
sub.class_name.should == "SubModule::SubscriberTest4"
sub.key.should == "SubModule::SubscriberTest4.test4"
sub.matcher.filters.should == {"bus_event_type"=>"test4"}
-
- Driver.perform(attributes.merge("bus_event_type" => "the_event"))
- hash = JSON.parse(ResqueBus.redis.lpop("queue:sub_queue1"))
- hash["class"].should == "SubModule::SubscriberTest3"
- hash["args"].should == [ {"bus_rider_app_key"=>"sub_module", "bus_rider_sub_key"=>"SubModule::SubscriberTest3.test3", "bus_rider_queue" => "sub_queue1", "bus_rider_class_name"=>"SubModule::SubscriberTest3",
- "bus_event_type" => "the_event", "x" => "y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 0
- Runner2.value.should == 0
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 1
- Runner2.value.should == 0
-
- hash = JSON.parse(ResqueBus.redis.lpop("queue:sub_queue2"))
- hash["class"].should == "SubModule::SubscriberTest3"
- hash["args"].should == [ {"bus_rider_app_key"=>"sub_module", "bus_rider_sub_key"=>"SubModule::SubscriberTest3.the_event", "bus_rider_queue" => "sub_queue2", "bus_rider_class_name"=>"SubModule::SubscriberTest3",
- "bus_event_type" => "the_event", "x" => "y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 1
- Runner2.value.should == 0
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 1
- Runner2.value.should == 1
+ QueueBus::Driver.perform(attributes.merge("bus_event_type" => "the_event"))
+
+ hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:sub_queue1") })
+ hash["class"].should == "QueueBus::Worker"
+ hash["args"].size.should == 1
+ JSON.parse(hash["args"].first).should == {"bus_class_proxy" => "SubModule::SubscriberTest3", "bus_rider_app_key"=>"sub_module", "bus_rider_sub_key"=>"SubModule::SubscriberTest3.test3", "bus_rider_queue" => "sub_queue1", "bus_rider_class_name"=>"SubModule::SubscriberTest3",
+ "bus_event_type" => "the_event", "x" => "y"}.merge(bus_attrs)
+
+ QueueBus::Runner1.value.should == 0
+ QueueBus::Runner2.value.should == 0
+ QueueBus::Util.constantize(hash["class"]).perform(*hash["args"])
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 0
+
+ hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:sub_queue2") })
+ hash["class"].should == "QueueBus::Worker"
+ hash["args"].size.should == 1
+ JSON.parse(hash["args"].first).should == {"bus_class_proxy" => "SubModule::SubscriberTest3", "bus_rider_app_key"=>"sub_module", "bus_rider_sub_key"=>"SubModule::SubscriberTest3.the_event", "bus_rider_queue" => "sub_queue2", "bus_rider_class_name"=>"SubModule::SubscriberTest3",
+ "bus_event_type" => "the_event", "x" => "y"}.merge(bus_attrs)
+
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 0
+ QueueBus::Util.constantize(hash["class"]).perform(*hash["args"])
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 1
end
-
+
it "should subscribe to default and attributes" do
- dispatcher = ResqueBus.dispatcher_by_key("my_thing")
+ dispatcher = QueueBus.dispatcher_by_key("my_thing")
all = dispatcher.subscriptions.all
-
+
sub = all.select{ |s| s.key == "SubscriberTest1.event_sub"}.first
sub.queue_name.should == "myqueue"
sub.class_name.should == "SubscriberTest1"
sub.key.should == "SubscriberTest1.event_sub"
sub.matcher.filters.should == {"bus_event_type"=>"event_sub"}
-
+
sub = all.select{ |s| s.key == "SubscriberTest1.thing_filter"}.first
sub.queue_name.should == "myqueue"
sub.class_name.should == "SubscriberTest1"
sub.key.should == "SubscriberTest1.thing_filter"
sub.matcher.filters.should == {"x"=>"y"}
-
- Driver.perform(attributes.merge("bus_event_type" => "event_sub"))
- ResqueBus.redis.smembers("queues").should =~ ["myqueue"]
- hash = JSON.parse(ResqueBus.redis.lpop("queue:myqueue"))
- hash["class"].should == "SubscriberTest1"
- hash["args"].should == [ {"bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.thing_filter", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1",
- "bus_event_type" => "event_sub", "x" => "y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 0
- Runner2.value.should == 0
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 0
- Runner2.value.should == 1
-
- hash = JSON.parse(ResqueBus.redis.lpop("queue:myqueue"))
- hash["class"].should == "SubscriberTest1"
- hash["args"].should == [ {"bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.event_sub", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1",
- "bus_event_type" => "event_sub", "x" => "y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 0
- Runner2.value.should == 1
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 1
- Runner2.value.should == 1
-
- Driver.perform(attributes.merge("bus_event_type" => "event_sub_other"))
- ResqueBus.redis.smembers("queues").should =~ ["myqueue"]
-
- hash = JSON.parse(ResqueBus.redis.lpop("queue:myqueue"))
- hash["class"].should == "SubscriberTest1"
- hash["args"].should == [ {"bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.thing_filter", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1",
- "bus_event_type" => "event_sub_other", "x" => "y"}.merge(bus_attrs) ]
-
- Runner1.value.should == 1
- Runner2.value.should == 1
- Util.constantize(hash["class"]).perform(*hash["args"])
- Runner1.value.should == 1
- Runner2.value.should == 2
-
- Driver.perform({"x"=>"z"}.merge("bus_event_type" => "event_sub_other"))
- ResqueBus.redis.smembers("queues").should =~ ["myqueue"]
-
- ResqueBus.redis.lpop("queue:myqueue").should be_nil
+ QueueBus::Driver.perform(attributes.merge("bus_event_type" => "event_sub"))
+ QueueBus.redis { |redis| redis.smembers("queues") }.should =~ ["myqueue"]
+
+ pop1 = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:myqueue") })
+ pop2 = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:myqueue") })
+
+ if JSON.parse(pop1["args"].first)["bus_rider_sub_key"] == "SubscriberTest1.thing_filter"
+ hash1 = pop1
+ hash2 = pop2
+ else
+ hash1 = pop2
+ hash2 = pop1
+ end
+
+ hash1["class"].should == "QueueBus::Worker"
+ JSON.parse(hash1["args"].first).should eq({"bus_class_proxy" => "SubscriberTest1", "bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.thing_filter", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1",
+ "bus_event_type" => "event_sub", "x" => "y"}.merge(bus_attrs))
+
+ QueueBus::Runner1.value.should == 0
+ QueueBus::Runner2.value.should == 0
+ QueueBus::Util.constantize(hash1["class"]).perform(*hash1["args"])
+ QueueBus::Runner1.value.should == 0
+ QueueBus::Runner2.value.should == 1
+
+ hash2["class"].should == "QueueBus::Worker"
+ hash2["args"].size.should == 1
+ JSON.parse(hash2["args"].first).should == {"bus_class_proxy" => "SubscriberTest1", "bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.event_sub", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1",
+ "bus_event_type" => "event_sub", "x" => "y"}.merge(bus_attrs)
+
+ QueueBus::Runner1.value.should == 0
+ QueueBus::Runner2.value.should == 1
+ QueueBus::Util.constantize(hash2["class"]).perform(*hash2["args"])
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 1
+
+ QueueBus::Driver.perform(attributes.merge("bus_event_type" => "event_sub_other"))
+ QueueBus.redis { |redis| redis.smembers("queues") }.should =~ ["myqueue"]
+
+ hash = JSON.parse(QueueBus.redis { |redis| redis.lpop("queue:myqueue") })
+ hash["class"].should == "QueueBus::Worker"
+ hash["args"].size.should == 1
+ JSON.parse(hash["args"].first).should == {"bus_class_proxy" => "SubscriberTest1", "bus_rider_app_key"=>"my_thing", "bus_rider_sub_key"=>"SubscriberTest1.thing_filter", "bus_rider_queue" => "myqueue", "bus_rider_class_name"=>"SubscriberTest1",
+ "bus_event_type" => "event_sub_other", "x" => "y"}.merge(bus_attrs)
+
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 1
+ QueueBus::Util.constantize(hash["class"]).perform(*hash["args"])
+ QueueBus::Runner1.value.should == 1
+ QueueBus::Runner2.value.should == 2
+
+ QueueBus::Driver.perform({"x"=>"z"}.merge("bus_event_type" => "event_sub_other"))
+ QueueBus.redis { |redis| redis.smembers("queues") }.should =~ ["myqueue"]
+
+ QueueBus.redis { |redis| redis.lpop("queue:myqueue") }.should be_nil
end
-
+
describe ".perform" do
let(:attributes) { {"bus_rider_sub_key"=>"SubscriberTest1.event_sub", "bus_locale" => "en", "bus_timezone" => "PST"} }
it "should call the method based on key" do
SubscriberTest1.any_instance.should_receive(:event_sub)
SubscriberTest1.perform(attributes)
end
it "should set the timezone and locale if present" do
defined?(I18n).should be_nil
- Time.respond_to?(:zone).should be_false
+ Time.respond_to?(:zone).should eq(false)
stub_const("I18n", Class.new)
I18n.should_receive(:locale=).with("en")
Time.should_receive(:zone=).with("PST")
-
+
SubscriberTest1.any_instance.should_receive(:event_sub)
SubscriberTest1.perform(attributes)
end
end
end
-end
\ No newline at end of file