## Resque Bus This gem provides an adapter for Resque for use in the [queue-bus](https://github.com/queue-bus/queue-bus) system. It uses Redis and the Resque that you are already using to allow simple asynchronous communication between apps. ### Install To install, include the 'resque-bus' gem and add the following to your Rakefile: ```ruby require "resque_bus/tasks" ``` ### Example Application A can publish an event ```ruby # pick an adapter require 'resque-bus' # (or other adapter) # business logic QueueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") # or do it later QueueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith") ``` Application B is subscribed to events ```ruby # pick an adapter require 'resque-bus' # (or other adapter) # initializer QueueBus.dispatch("app_b") do # processes event on app_b_default queue # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created" subscribe "user_created" do |attributes| NameCount.find_or_create_by_name(attributes["last_name"]).increment! end # processes event on app_b_critical queue # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid" critical "user_paid" do |attributes| CreditCard.charge!(attributes) end # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|` # and regexes work as well. note that with the above configuration along with this regex, # the following as well as the corresponding block above would both be executed subscribe /^user_/ do |attributes| Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) end # the above all filter on just the event_type, but you can filter on anything # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do Mixpanel.homepage_action!(attributes["action"]) end end ``` Applications can also subscribe within classes using the provided `Subscriber` module. ```ruby class SimpleSubscriber include QueueBus::Subscriber subscribe :my_method def my_method(attributes) # heavy lifting end end ``` The following is equivalent to the original initializer and shows more options: ```ruby class OtherSubscriber include QueueBus::Subscriber application :app_b subscribe :user_created subscribe_queue :app_b_critical, :user_paid subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/ subscribe :homepage_method, :user_id => :present, :page => "homepage" def user_created(attributes) NameCount.find_or_create_by_name(attributes["last_name"]).increment! end def user_paid(attributes) CreditCard.charge!(attributes) end def user_action(attributes) Metrics.record_user_action(attributes["bus_event_type"], attributes["id"]) end def homepage_method Mixpanel.homepage_action!(attributes["action"]) end end ``` Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly. ### Commands Each app needs to tell Redis about its subscriptions: $ rake queuebus:subscribe The subscription block is run inside a Resque worker which needs to be started for each app. $ rake queuebus:setup resque:work The incoming queue also needs to be processed on a dedicated or all the app servers. $ rake queuebus:driver resque:work If you want retry to work for subscribing apps, you should run resque-scheduler $ rake resque:scheduler ### Heartbeat We've found it useful to have the bus act like `cron`, triggering timed jobs throughout the system. Resque Bus calls this a heartbeat. It uses resque-scheduler to trigger the events. You can enable it in your Rakefile. ```ruby # resque.rake namespace :resque do task :setup => [:environment] do QueueBus.heartbeat! end end ``` Or add it to your `schedule.yml` directly ```yaml resquebus_heartbeat: cron: "* * * * *" class: "::QueueBus::Heartbeat" queue: bus_incoming description: "I publish a heartbeat_minutes event every minute" ``` It is the equivalent of doing this every minute ```ruby seconds = minutes * (60) hours = minutes / (60) days = minutes / (60*24) now = Time.at(seconds) attributes = {} now = Time.now seconds = now.to_i QueueBus.publish("hearbeat_minutes", { "epoch_seconds" => seconds, "epoch_minutes" => seconds / 1.minute, "epoch_hours" => seconds / 1.hour, "epoch_days" => seconds / 1.day, "minute" => now.min "hour" => now.hour "day" => now.day "month" => now.month "year" => now.year "yday" => now.yday "wday" => now.wday }) ``` This allows you do something like this: ```ruby QueueBus.dispatch("app_c") do # runs at 10:20, 11:20, etc subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 20 do |attributes| Sitemap.generate! end # runs every five minutes subscribe "every_five_minutes", 'bus_event_type' => 'heartbeat_minutes' do |attributes| next unless attributes["epoch_minutes"] % 5 == 0 HealthCheck.run! end # runs at 8am on the first of every month subscribe "new_month_morning", 'bus_event_type' => 'heartbeat_minutes', 'day' => 1, hour' => 8, 'minute' => 0, do |attributes| next unless attributes["epoch_minutes"] % 5 == 0 Token.old.expire! end end ``` ### Local Mode For development, a local mode is provided and is specified in the configuration. ```ruby # config QueueBus.local_mode = :standalone or QueueBus.local_mode = :inline ``` Standalone mode does not require a separate queuebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate queuebus:work task does still need to be run to process these events Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block. You can also say `QueueBus.local_mode = :suppress` to turn off publishing altogether. This can be helpful inside some sort of migration, for example. ### TODO * Replace local modes with adapters * Make this not freak out in development without Redis or when Redis is down * We might not actually need to publish in tests * Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines * Allow calling queuebus:setup and queuebus:driver together (append to ENV['QUEUES'], don't replace it)