lib/msgr/connection.rb in msgr-0.4.1 vs lib/msgr/connection.rb in msgr-0.5.0

- old
+ new

@@ -1,125 +1,125 @@ +require 'bunny' +require 'multi_json' + module Msgr class Connection - include Celluloid include Logging - attr_reader :conn, :dispatcher, :routes, :opts - finalizer :close + EXCHANGE_NAME = 'msgr' - def initialize(conn, routes, dispatcher, opts = {}) - @conn = conn + attr_reader :uri, :config + + def initialize(uri, config, dispatcher) + @uri = uri + @config = config @dispatcher = dispatcher - @routes = routes - @opts = opts + end - @channel = conn.create_channel - @channel.prefetch(10) + def running? + subscriptions.any? + end - bind + def publish(payload, opts = {}) + opts[:routing_key] = opts.delete(:to) if opts[:to] + + begin + payload = MultiJson.dump(payload) + exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json') + rescue => error + exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text') + end + + log(:debug) { "Published message to #{opts[:routing_key]}" } end - def rebind - release - bind + def connection + @connection ||= ::Bunny.new(config).tap { |b| b.start } end - def bind - # Create new bindings - routes.each { |route| bindings << Binding.new(Actor.current, route, dispatcher) } - - log(:debug) { 'New routes bound.' } + def channel + @channel ||= connection.create_channel end - def prefix(name = '') - opts[:prefix] ? "#{opts[:prefix]}-#{name}" : name + def release + subscriptions.each { |subscription| subscription.cancel } end - # Used to store al bindings. Allows use to - # release bindings when receiver should not longer - # receive messages but channel need to be open - # to allow further acknowledgments. - # - def bindings - @bindings ||= [] + def subscriptions + @subscription ||= [] end - def queue(name) - @channel.queue(prefix(name), durable: true).tap do |queue| - log(:debug) { "Create queue #{queue.name} (durable: #{queue.durable?}, auto_delete: #{queue.auto_delete?})" } + def prefix(name) + if config[:prefix].present? + "#{config[:prefix]}-#{name}" + else + name end end def exchange - unless @exchange - @exchange = @channel.topic prefix('msgr'), durable: true - - log(:debug) { "Created exchange #{@exchange.name} (type: #{@exchange.type}, durable: #{@exchange.durable?}, auto_delete: #{@exchange.auto_delete?})" } + @exchange ||= channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex| + log(:debug) do + "Created exchange #{ex.name} (type: #{ex.type}, " \ + "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})" + end end - - @exchange end - # Release all bindings but do not close channel. Will not - # longer receive any message but channel can be used to - # acknowledge currently processing messages. - # - def release(wait = false) - return unless bindings.any? - - log(:debug) { "Release all bindings#{wait ? ' after queues are empty': ''}..." } - - if wait - binds = bindings.dup - while binds.any? - binds.reject! { |b| b.release_if_empty } - sleep 1 + def queue(name) + channel.queue(prefix(name), durable: true).tap do |queue| + log(:debug) do + "Create queue #{queue.name} (durable: #{queue.durable?}, " \ + "auto_delete: #{queue.auto_delete?})" end - else - bindings.each &:release end - - log(:debug) { 'All bindings released.' } end - def delete - return unless bindings.any? - - log(:debug) { 'Delete all bindings and exchange.' } - - bindings.each { |binding| binding.delete } - bindings.clear - - @exchange.delete if @exchange - end - - def publish(payload, opts = {}) - opts[:routing_key] ||= opts[:to] - raise ArgumentError, 'Missing routing key.' unless opts[:routing_key] - - log(:debug) { "Publish message to #{opts[:routing_key]}" } - - begin - payload = JSON.generate(payload) - exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json') - rescue => error - exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text') + def bind(routes) + if routes.empty? + log(:warn) { "No routes to bound to. Bind will have no effect. (#{routes.inspect})" } + else + bind_all(routes) end end def ack(delivery_tag) - log(:debug) { "Ack message: #{delivery_tag}" } - @channel.ack delivery_tag + channel.ack delivery_tag + log(:debug) { "Acked message: #{delivery_tag}" } end def reject(delivery_tag, requeue = true) - log(:debug) { "Reject message: #{delivery_tag}" } - @channel.reject delivery_tag, requeue + channel.reject delivery_tag, requeue + log(:debug) { "Rejected message: #{delivery_tag}" } end def close - @channel.close if @channel.open? - log(:debug) { 'Connection closed.' } + channel.close if @channel && @channel.open? + connection.close if @connection + log(:debug) { 'Closed.' } + end + + private + def bind_all(routes) + routes.each do |route| + queue = queue(route.name) + + route.keys.each do |key| + log(:debug) { "Bind #{key} to #{queue.name}." } + + queue.bind exchange, routing_key: key + end + + subscriptions << queue.subscribe(ack: true) do |*args| + begin + @dispatcher.call Message.new(self, *args, route) + rescue => err + log(:error) do + "Rescued error from subscribe: #{err.class.name}: #{err}\n#{err.backtrace.join("\n")}" + end + end + end + end end end end