README.mdown in resque-bus-0.3.0 vs README.mdown in resque-bus-0.3.1

- old
+ new

@@ -4,203 +4,230 @@ ### Example Application A can publish an event - # config - ResqueBus.redis = "192.168.1.1:6379" +```ruby +# config +ResqueBus.redis = "192.168.1.1:6379" - # business logic - ResqueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") - - # or do it later - ResqueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") +# business logic +ResqueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") +# or do it later +ResqueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") +``` + Application B is subscribed to events - # config - ResqueBus.redis = "192.168.1.1:6379" - - # initializer - ResqueBus.dispatch("app_b") do - # processes event on app_b_default queue - # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created" - subscribe "user_created" do |attributes| - NameCount.find_or_create_by_name(attributes["last_name"]).increment! - end - - # processes event on app_b_critical queue - # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid" - critical "user_paid" do |attributes| - CreditCard.charge!(attributes) - end +```ruby +# config +ResqueBus.redis = "192.168.1.1:6379" - # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|` - - # and regexes work as well. note that with the above configuration along with this regex, - # the following as well as the corresponding block above would both be executed - subscribe /^user_/ do |attributes| - Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) - end - - # the above all filter on just the event_type, but you can filter on anything - # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type - subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do - Mixpanel.homepage_action!(attributes["action"]) - end - end +# initializer +ResqueBus.dispatch("app_b") do + # processes event on app_b_default queue + # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created" + subscribe "user_created" do |attributes| + NameCount.find_or_create_by_name(attributes["last_name"]).increment! + end + + # processes event on app_b_critical queue + # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid" + critical "user_paid" do |attributes| + CreditCard.charge!(attributes) + end + # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|` + + # and regexes work as well. note that with the above configuration along with this regex, + # the following as well as the corresponding block above would both be executed + subscribe /^user_/ do |attributes| + Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) + end + + # the above all filter on just the event_type, but you can filter on anything + # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type + subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do + Mixpanel.homepage_action!(attributes["action"]) + end +end +``` + Applications can also subscribe within classes using the provided `Subscriber` module. - class SimpleSubscriber - include ResqueBus::Subscriber - subscribe :my_method - - def my_method(attributes) - # heavy lifting - end - end +```ruby +class SimpleSubscriber + include ResqueBus::Subscriber + subscribe :my_method + def my_method(attributes) + # heavy lifting + end +end +``` + The following is equivalent to the original initializer and shows more options: - class OtherSubscriber - include ResqueBus::Subscriber - application :app_b +```ruby +class OtherSubscriber + include ResqueBus::Subscriber + application :app_b - subscribe :user_created - subscribe_queue :app_b_critical, :user_paid - subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/ - subscribe :homepage_method, :user_id => :present, :page => "homepage" + subscribe :user_created + subscribe_queue :app_b_critical, :user_paid + subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/ + subscribe :homepage_method, :user_id => :present, :page => "homepage" - def user_created(attributes) - NameCount.find_or_create_by_name(attributes["last_name"]).increment! - end + def user_created(attributes) + NameCount.find_or_create_by_name(attributes["last_name"]).increment! + end - def user_paid(attributes) - CreditCard.charge!(attributes) - end + def user_paid(attributes) + CreditCard.charge!(attributes) + end - def user_action(attributes) - Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) - end + def user_action(attributes) + Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) + end - def homepage_method - Mixpanel.homepage_action!(attributes["action"]) - end - end + def homepage_method + Mixpanel.homepage_action!(attributes["action"]) + end +end +``` +Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly. +### Commands + +Each app needs to tell Redis about its subscriptions: + + $ rake resquebus:subscribe + The subscription block is run inside a Resque worker which needs to be started for each app. $ rake resquebus:setup resque:work The incoming queue also needs to be processed on a dedicated or all the app servers. $ rake resquebus:driver resque:work If you want retry to work for subscribing apps, you should run resque-scheduler - $ rake resquebus:driver resque:scheduler + $ rake resque:scheduler ### Heartbeat -We've found it useful to have the bus act like cron. Triggering timed jobs throughout the system. Resque Bus calls this a heartbeat. +We've found it useful to have the bus act like `cron`, triggering timed jobs throughout the system. Resque Bus calls this a heartbeat. It uses resque-scheduler to trigger the events. You can enable it in your Rakefile. - # resque.rake - namespace :resque do - task :setup => [:environment] do - ResqueBus.heartbeat! - end - end +```ruby +# resque.rake +namespace :resque do + task :setup => [:environment] do + ResqueBus.heartbeat! + end +end +``` Or add it to your `schedule.yml` directly - resquebus_heartbeat: - cron: "* * * * *" - class: "::ResqueBus::Heartbeat" - queue: resquebus_incoming - description: "I publish a heartbeat_minutes event every minute" +```yaml +resquebus_heartbeat: + cron: "* * * * *" + class: "::ResqueBus::Heartbeat" + queue: resquebus_incoming + description: "I publish a heartbeat_minutes event every minute" +``` It is the equivalent of doing this every minute +```ruby seconds = minutes * (60) hours = minutes / (60) days = minutes / (60*24) now = Time.at(seconds) attributes = {} +now = Time.now +seconds = now.to_i +ResqueBus.publish("hearbeat_minutes", { + "epoch_seconds" => seconds, + "epoch_minutes" => seconds / 1.minute, + "epoch_hours" => seconds / 1.hour, + "epoch_days" => seconds / 1.day, + "minute" => now.min + "hour" => now.hour + "day" => now.day + "month" => now.month + "year" => now.year + "yday" => now.yday + "wday" => now.wday +}) +``` - now = Time.now - seconds = now.to_i - ResqueBus.publish("hearbeat_minutes", { - "epoch_seconds" => seconds, - "epoch_minutes" => seconds / 1.minute, - "epoch_hours" => seconds / 1.hour, - "epoch_days" => seconds / 1.day, - "minute" => now.min - "hour" => now.hour - "day" => now.day - "month" => now.month - "year" => now.year - "yday" => now.yday - "wday" => now.wday - }) - This allows you do something like this: - ResqueBus.dispatch("app_c") do - # runs at 10:20, 11:20, etc - subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 20 do |attributes| - Sitemap.generate! - end - - # runs every five minutes - subscribe "every_five_minutes", 'bus_event_type' => 'heartbeat_minutes' do |attributes| - next unless attributes["epoch_minutes"] % 5 == 0 - HealthCheck.run! - end - - # runs at 8am on the first of every month - subscribe "new_month_morning", 'bus_event_type' => 'heartbeat_minutes', 'day' => 1, hour' => 8, 'minute' => 0, do |attributes| - next unless attributes["epoch_minutes"] % 5 == 0 - Token.old.expire! - end - end +```ruby +ResqueBus.dispatch("app_c") do + # runs at 10:20, 11:20, etc + subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 20 do |attributes| + Sitemap.generate! + end + + # runs every five minutes + subscribe "every_five_minutes", 'bus_event_type' => 'heartbeat_minutes' do |attributes| + next unless attributes["epoch_minutes"] % 5 == 0 + HealthCheck.run! + end + + # runs at 8am on the first of every month + subscribe "new_month_morning", 'bus_event_type' => 'heartbeat_minutes', 'day' => 1, hour' => 8, 'minute' => 0, do |attributes| + next unless attributes["epoch_minutes"] % 5 == 0 + Token.old.expire! + end +end +``` - ### Compatibility ResqueBus can live along side another instance of Resque that points at a different Redis server. - - # config - Resque.redis = "192.168.1.0:6379" - ResqueBus.redis = "192.168.1.1:6379" +```ruby +# config +Resque.redis = "192.168.1.0:6379" +ResqueBus.redis = "192.168.1.1:6379" +``` + If no Redis instance is given specifically, ResqueBus will use the Resque one. - # config - Resque.redis = "192.168.1.0:6379" +```ruby +# config +Resque.redis = "192.168.1.0:6379" +``` That will use the default (resque) namespace which can be helpful for using the tooling. Conflict with queue names are unlikely. You can change the namespace if you like though. - # config - Resque.redis = "192.168.1.0:6379" - ResqusBus.redis.namespace = :get_on_the_bus +```ruby +# config +Resque.redis = "192.168.1.0:6379" +ResqusBus.redis.namespace = :get_on_the_bus +``` - ### Local Mode -For development, a local mode is also provided and is specified in the -configuration. +For development, a local mode is also provided and is specified in the configuration. - # config - ResqueBus.local_mode = :standalone - or - ResqueBus.local_mode = :inline +```ruby +# config +ResqueBus.local_mode = :standalone +or +ResqueBus.local_mode = :inline +``` Standalone mode does not require a separate resquebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate resquebus:work task does still need to be run to process these events @@ -213,9 +240,8 @@ * There are a few spots in the code with TODO notes * Make this not freak out in development without Redis or when Redis is down * We might not actually need to publish in tests * Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines -* A synchronous version will be needed for several use cases. Make it so that an event can go in real-time to one subscriber and still be async to the rest. -* Should this use resque-retry or should they jsut go into the failure queue? +* Allow calling resquebus:setup and resquebus:driver together (append to ENV['QUEUES'], don't replace it) Copyright (c) 2011 Brian Leonard, released under the MIT license