## Resque Bus This gem uses Redis and Resque to allow simple asynchronous communication between apps. ### Install To install, include the 'resque-bus' gem and add the following to your Rakefile: ```ruby require "resque_bus/tasks" ``` ### Example Application A can publish an event ```ruby # config ResqueBus.redis = "192.168.1.1:6379" # business logic ResqueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") # or do it later ResqueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") ``` Application B is subscribed to events ```ruby # config ResqueBus.redis = "192.168.1.1:6379" # initializer ResqueBus.dispatch("app_b") do # processes event on app_b_default queue # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created" subscribe "user_created" do |attributes| NameCount.find_or_create_by_name(attributes["last_name"]).increment! end # processes event on app_b_critical queue # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid" critical "user_paid" do |attributes| CreditCard.charge!(attributes) end # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|` # and regexes work as well. note that with the above configuration along with this regex, # the following as well as the corresponding block above would both be executed subscribe /^user_/ do |attributes| Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) end # the above all filter on just the event_type, but you can filter on anything # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do Mixpanel.homepage_action!(attributes["action"]) end end ``` Applications can also subscribe within classes using the provided `Subscriber` module. ```ruby class SimpleSubscriber include ResqueBus::Subscriber subscribe :my_method def my_method(attributes) # heavy lifting end end ``` The following is equivalent to the original initializer and shows more options: ```ruby class OtherSubscriber include ResqueBus::Subscriber application :app_b subscribe :user_created subscribe_queue :app_b_critical, :user_paid subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/ subscribe :homepage_method, :user_id => :present, :page => "homepage" def user_created(attributes) NameCount.find_or_create_by_name(attributes["last_name"]).increment! end def user_paid(attributes) CreditCard.charge!(attributes) end def user_action(attributes) Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) end def homepage_method Mixpanel.homepage_action!(attributes["action"]) end end ``` Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly. ### Commands Each app needs to tell Redis about its subscriptions: $ rake resquebus:subscribe The subscription block is run inside a Resque worker which needs to be started for each app. $ rake resquebus:setup resque:work The incoming queue also needs to be processed on a dedicated or all the app servers. $ rake resquebus:driver resque:work If you want retry to work for subscribing apps, you should run resque-scheduler $ rake resque:scheduler ### Heartbeat We've found it useful to have the bus act like `cron`, triggering timed jobs throughout the system. Resque Bus calls this a heartbeat. It uses resque-scheduler to trigger the events. You can enable it in your Rakefile. ```ruby # resque.rake namespace :resque do task :setup => [:environment] do ResqueBus.heartbeat! end end ``` Or add it to your `schedule.yml` directly ```yaml resquebus_heartbeat: cron: "* * * * *" class: "::ResqueBus::Heartbeat" queue: resquebus_incoming description: "I publish a heartbeat_minutes event every minute" ``` It is the equivalent of doing this every minute ```ruby seconds = minutes * (60) hours = minutes / (60) days = minutes / (60*24) now = Time.at(seconds) attributes = {} now = Time.now seconds = now.to_i ResqueBus.publish("hearbeat_minutes", { "epoch_seconds" => seconds, "epoch_minutes" => seconds / 1.minute, "epoch_hours" => seconds / 1.hour, "epoch_days" => seconds / 1.day, "minute" => now.min "hour" => now.hour "day" => now.day "month" => now.month "year" => now.year "yday" => now.yday "wday" => now.wday }) ``` This allows you do something like this: ```ruby ResqueBus.dispatch("app_c") do # runs at 10:20, 11:20, etc subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 20 do |attributes| Sitemap.generate! end # runs every five minutes subscribe "every_five_minutes", 'bus_event_type' => 'heartbeat_minutes' do |attributes| next unless attributes["epoch_minutes"] % 5 == 0 HealthCheck.run! end # runs at 8am on the first of every month subscribe "new_month_morning", 'bus_event_type' => 'heartbeat_minutes', 'day' => 1, hour' => 8, 'minute' => 0, do |attributes| next unless attributes["epoch_minutes"] % 5 == 0 Token.old.expire! end end ``` ### Compatibility ResqueBus can live along side another instance of Resque that points at a different Redis server. ```ruby # config Resque.redis = "192.168.1.0:6379" ResqueBus.redis = "192.168.1.1:6379" ``` If no Redis instance is given specifically, ResqueBus will use the Resque one. ```ruby # config Resque.redis = "192.168.1.0:6379" ``` That will use the default (resque) namespace which can be helpful for using the tooling. Conflict with queue names are unlikely. You can change the namespace if you like though. ```ruby # config Resque.redis = "192.168.1.0:6379" ResqusBus.redis.namespace = :get_on_the_bus ``` ### Local Mode For development, a local mode is also provided and is specified in the configuration. ```ruby # config ResqueBus.local_mode = :standalone or ResqueBus.local_mode = :inline ``` Standalone mode does not require a separate resquebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate resquebus:work task does still need to be run to process these events Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block. ### TODO * There are a few spots in the code with TODO notes * Make this not freak out in development without Redis or when Redis is down * We might not actually need to publish in tests * Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines * Allow calling resquebus:setup and resquebus:driver together (append to ENV['QUEUES'], don't replace it) Copyright (c) 2011 Brian Leonard, released under the MIT license