Sha256: cfb5052c00c620cc8bb813492c93648e57add798a42f9bc65f153e8f5d6eef04
Contents?: true
Size: 1.86 KB
Versions: 1
Compression:
Stored size: 1.86 KB
Contents
require 'yaml' require 'wisper' require 'sidekiq' require 'wisper/sidekiq/version' module Wisper # based on Sidekiq 4.x #delay method, which is not enabled by default in Sidekiq 5.x # https://github.com/mperham/sidekiq/blob/4.x/lib/sidekiq/extensions/generic_proxy.rb # https://github.com/mperham/sidekiq/blob/4.x/lib/sidekiq/extensions/class_methods.rb class SidekiqBroadcaster class Worker include ::Sidekiq::Worker def perform(yml) (subscriber, event, args) = if Psych::VERSION.to_i >= 4 ::YAML.unsafe_load(yml) else ::YAML.load(yml) end subscriber.public_send(event, *args) end end def self.register Wisper.configure do |config| config.broadcaster :sidekiq, SidekiqBroadcaster.new config.broadcaster :async, SidekiqBroadcaster.new end end def broadcast(subscriber, _publisher, event, *args, **_kwargs) options = sidekiq_options(subscriber) schedule_options = sidekiq_schedule_options(subscriber, event) Worker.set(options).perform_in( schedule_options.fetch(:delay, 0), ::YAML.dump([subscriber, event, args]) ) end private def sidekiq_options(subscriber) subscriber.respond_to?(:sidekiq_options) ? subscriber.sidekiq_options : {} end def sidekiq_schedule_options(subscriber, event) return {} unless subscriber.respond_to?(:sidekiq_schedule_options) options = subscriber.sidekiq_schedule_options if options.has_key?(event.to_sym) delay_option(options[event.to_sym]) else delay_option(options) end end def delay_option(options) return {} unless options.key?(:perform_in) || options.key?(:perform_at) { delay: options[:perform_in] || options[:perform_at] } end end end Wisper::SidekiqBroadcaster.register
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
wisper-sidekiq-compat-2.0.0 | lib/wisper/sidekiq.rb |