lib/msgr/connection.rb in msgr-0.4.1 vs lib/msgr/connection.rb in msgr-0.5.0
- old
+ new
@@ -1,125 +1,125 @@
+require 'bunny'
+require 'multi_json'
+
module Msgr
class Connection
- include Celluloid
include Logging
- attr_reader :conn, :dispatcher, :routes, :opts
- finalizer :close
+ EXCHANGE_NAME = 'msgr'
- def initialize(conn, routes, dispatcher, opts = {})
- @conn = conn
+ attr_reader :uri, :config
+
+ def initialize(uri, config, dispatcher)
+ @uri = uri
+ @config = config
@dispatcher = dispatcher
- @routes = routes
- @opts = opts
+ end
- @channel = conn.create_channel
- @channel.prefetch(10)
+ def running?
+ subscriptions.any?
+ end
- bind
+ def publish(payload, opts = {})
+ opts[:routing_key] = opts.delete(:to) if opts[:to]
+
+ begin
+ payload = MultiJson.dump(payload)
+ exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json')
+ rescue => error
+ exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text')
+ end
+
+ log(:debug) { "Published message to #{opts[:routing_key]}" }
end
- def rebind
- release
- bind
+ def connection
+ @connection ||= ::Bunny.new(config).tap { |b| b.start }
end
- def bind
- # Create new bindings
- routes.each { |route| bindings << Binding.new(Actor.current, route, dispatcher) }
-
- log(:debug) { 'New routes bound.' }
+ def channel
+ @channel ||= connection.create_channel
end
- def prefix(name = '')
- opts[:prefix] ? "#{opts[:prefix]}-#{name}" : name
+ def release
+ subscriptions.each { |subscription| subscription.cancel }
end
- # Used to store al bindings. Allows use to
- # release bindings when receiver should not longer
- # receive messages but channel need to be open
- # to allow further acknowledgments.
- #
- def bindings
- @bindings ||= []
+ def subscriptions
+ @subscription ||= []
end
- def queue(name)
- @channel.queue(prefix(name), durable: true).tap do |queue|
- log(:debug) { "Create queue #{queue.name} (durable: #{queue.durable?}, auto_delete: #{queue.auto_delete?})" }
+ def prefix(name)
+ if config[:prefix].present?
+ "#{config[:prefix]}-#{name}"
+ else
+ name
end
end
def exchange
- unless @exchange
- @exchange = @channel.topic prefix('msgr'), durable: true
-
- log(:debug) { "Created exchange #{@exchange.name} (type: #{@exchange.type}, durable: #{@exchange.durable?}, auto_delete: #{@exchange.auto_delete?})" }
+ @exchange ||= channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
+ log(:debug) do
+ "Created exchange #{ex.name} (type: #{ex.type}, " \
+ "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
+ end
end
-
- @exchange
end
- # Release all bindings but do not close channel. Will not
- # longer receive any message but channel can be used to
- # acknowledge currently processing messages.
- #
- def release(wait = false)
- return unless bindings.any?
-
- log(:debug) { "Release all bindings#{wait ? ' after queues are empty': ''}..." }
-
- if wait
- binds = bindings.dup
- while binds.any?
- binds.reject! { |b| b.release_if_empty }
- sleep 1
+ def queue(name)
+ channel.queue(prefix(name), durable: true).tap do |queue|
+ log(:debug) do
+ "Create queue #{queue.name} (durable: #{queue.durable?}, " \
+ "auto_delete: #{queue.auto_delete?})"
end
- else
- bindings.each &:release
end
-
- log(:debug) { 'All bindings released.' }
end
- def delete
- return unless bindings.any?
-
- log(:debug) { 'Delete all bindings and exchange.' }
-
- bindings.each { |binding| binding.delete }
- bindings.clear
-
- @exchange.delete if @exchange
- end
-
- def publish(payload, opts = {})
- opts[:routing_key] ||= opts[:to]
- raise ArgumentError, 'Missing routing key.' unless opts[:routing_key]
-
- log(:debug) { "Publish message to #{opts[:routing_key]}" }
-
- begin
- payload = JSON.generate(payload)
- exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json')
- rescue => error
- exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text')
+ def bind(routes)
+ if routes.empty?
+ log(:warn) { "No routes to bound to. Bind will have no effect. (#{routes.inspect})" }
+ else
+ bind_all(routes)
end
end
def ack(delivery_tag)
- log(:debug) { "Ack message: #{delivery_tag}" }
- @channel.ack delivery_tag
+ channel.ack delivery_tag
+ log(:debug) { "Acked message: #{delivery_tag}" }
end
def reject(delivery_tag, requeue = true)
- log(:debug) { "Reject message: #{delivery_tag}" }
- @channel.reject delivery_tag, requeue
+ channel.reject delivery_tag, requeue
+ log(:debug) { "Rejected message: #{delivery_tag}" }
end
def close
- @channel.close if @channel.open?
- log(:debug) { 'Connection closed.' }
+ channel.close if @channel && @channel.open?
+ connection.close if @connection
+ log(:debug) { 'Closed.' }
+ end
+
+ private
+ def bind_all(routes)
+ routes.each do |route|
+ queue = queue(route.name)
+
+ route.keys.each do |key|
+ log(:debug) { "Bind #{key} to #{queue.name}." }
+
+ queue.bind exchange, routing_key: key
+ end
+
+ subscriptions << queue.subscribe(ack: true) do |*args|
+ begin
+ @dispatcher.call Message.new(self, *args, route)
+ rescue => err
+ log(:error) do
+ "Rescued error from subscribe: #{err.class.name}: #{err}\n#{err.backtrace.join("\n")}"
+ end
+ end
+ end
+ end
end
end
end