Sha256: 112ad0ff84f7e564dcc49976d8c1874bb1666a90372a659a9501f03c17e82ba7
Contents?: true
Size: 1.88 KB
Versions: 2
Compression:
Stored size: 1.88 KB
Contents
require 'yaml' require 'wisper' require 'sidekiq' require 'wisper/sidekiq/version' module Wisper # based on Sidekiq 4.x #delay method, which is not enabled by default in Sidekiq 5.x # https://github.com/mperham/sidekiq/blob/4.x/lib/sidekiq/extensions/generic_proxy.rb # https://github.com/mperham/sidekiq/blob/4.x/lib/sidekiq/extensions/class_methods.rb class SidekiqBroadcaster class Worker include ::Sidekiq::Worker def perform(yml) (subscriber, event, args, kwargs) = if Psych::VERSION.to_i >= 4 ::YAML.unsafe_load(yml) else ::YAML.load(yml) end subscriber.public_send(event, *args, **kwargs) end end def self.register Wisper.configure do |config| config.broadcaster :sidekiq, SidekiqBroadcaster.new config.broadcaster :async, SidekiqBroadcaster.new end end def broadcast(subscriber, _publisher, event, *args, **kwargs) options = sidekiq_options(subscriber) schedule_options = sidekiq_schedule_options(subscriber, event) Worker.set(options).perform_in( schedule_options.fetch(:delay, 0), ::YAML.dump([subscriber, event, args, kwargs]) ) end private def sidekiq_options(subscriber) subscriber.respond_to?(:sidekiq_options) ? subscriber.sidekiq_options : {} end def sidekiq_schedule_options(subscriber, event) return {} unless subscriber.respond_to?(:sidekiq_schedule_options) options = subscriber.sidekiq_schedule_options if options.has_key?(event.to_sym) delay_option(options[event.to_sym]) else delay_option(options) end end def delay_option(options) return {} unless options.key?(:perform_in) || options.key?(:perform_at) { delay: options[:perform_in] || options[:perform_at] } end end end Wisper::SidekiqBroadcaster.register
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
wisper-sidekiq-compat-2.3.0 | lib/wisper/sidekiq.rb |
wisper-sidekiq-compat-2.1.0 | lib/wisper/sidekiq.rb |