lib/sidekiq_bus/adapter.rb in sidekiq-bus-0.7.0 vs lib/sidekiq_bus/adapter.rb in sidekiq-bus-0.8.0

- old
+ new

@@ -1,35 +1,65 @@ +# frozen_string_literal: true + module QueueBus module Adapters + # The sidekiq adapter for queue-bus. It handles enabling, enqueuing, and + # setting up the heartbeat. class Sidekiq < QueueBus::Adapters::Base def enabled! # know we are using it require 'sidekiq' - #this sidekiq middleware adds in the 'retry' key to the job payload so we ensure sidekiq plays well with resque + # this sidekiq middleware adds in the 'retry' key to the job payload so + # we ensure sidekiq plays well with resque. ::Sidekiq.configure_server do |config| config.client_middleware do |chain| chain.prepend ::SidekiqBus::Middleware::Client::Retry end end + ::QueueBus::Worker.include ::Sidekiq::Worker end def redis(&block) ::Sidekiq.redis(&block) end def enqueue(queue_name, klass, hash) - ::Sidekiq::Client.push('queue' => queue_name, 'class' => klass, 'args' => [hash]) + ::Sidekiq::Client.push('queue' => queue_name, + 'class' => klass, + 'args' => [hash]) end def enqueue_at(epoch_seconds, queue_name, klass, hash) - ::Sidekiq::Client.push('queue' => queue_name, 'class' => klass, 'args' => [hash], 'at' => epoch_seconds) + ::Sidekiq::Client.push('queue' => queue_name, + 'class' => klass, + 'args' => [hash], + 'at' => epoch_seconds) end + # Sets up the heartbeat to be broadcast via sidekiq. Only enable this when + # you have disabled the resque heart beat schedule as well, as having both + # may cause issues. + # + # While this will work so long as every time sidekiq boots it triggers this + # set up. You may consider enabling dynamic schedules to keep all nodes up + # to date if it ever changes. def setup_heartbeat!(queue_name) - # TODO: not sure how to do this or what is means to set this up in Sidekiq - raise NotImplementedError + require 'sidekiq-scheduler' + + ::Sidekiq.set_schedule( + 'sidekiqbus_heartbeat', + every: '1min', + class: ::QueueBus::Worker.name, + args: [ + ::QueueBus::Util.encode('bus_class_proxy' => ::QueueBus::Heartbeat.name) + ], + queue: queue_name, + description: 'Enqueues a heart beat every minute for the queue-bus' + ) + # Must reload the schedule to make it present in memory + ::Sidekiq.reload_schedule! end end end end