Sha256: 17aeb8c5079f87cb1a14ade2aac79b9b842487fd7c1d0942a0d4a1eafa68f4d7
Contents?: true
Size: 1.93 KB
Versions: 1
Compression:
Stored size: 1.93 KB
Contents
require 'bunny' require 'delegate' module Basquiat module Adapters # The RabbitMQ adapter for Basquiat class RabbitMq < Basquiat::Adapters::Base using Basquiat::HashRefinements # Avoid superclass mismatch errors require 'basquiat/adapters/rabbitmq/events' require 'basquiat/adapters/rabbitmq/message' require 'basquiat/adapters/rabbitmq/configuration' require 'basquiat/adapters/rabbitmq/connection' require 'basquiat/adapters/rabbitmq/session' require 'basquiat/adapters/rabbitmq/requeue_strategies' def initialize super @procs = Events.new end def base_options @configuration ||= Configuration.new @configuration.merge_user_options(Basquiat.configuration.adapter_options) end def subscribe_to(event_name, proc) procs[event_name] = proc end def publish(event, message, persistent: options[:publisher][:persistent], props: {}) connection.with_network_failure_handler do session.publish(event, message, props) disconnect unless persistent end end def listen(block: true) connection.with_network_failure_handler do procs.keys.each { |key| session.bind_queue(key) } session.subscribe(block) do |message| strategy.run(message) do procs[message.routing_key].call(message) end end end end def reset_connection connection.disconnect @connection = nil @session = nil end alias_method :disconnect, :reset_connection def strategy @strategy ||= @configuration.strategy.new(session) end def session @session ||= Session.new(connection, @configuration.session_options) end private def connection @connection ||= Connection.new(@configuration.connection_options) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
basquiat-1.2.0 | lib/basquiat/adapters/rabbitmq_adapter.rb |