Sha256: 36b2052d86a393205ec7f0e0617d1bce9f2673f72e18175814eccf9e6558ed40
Contents?: true
Size: 1.21 KB
Versions: 2
Compression:
Stored size: 1.21 KB
Contents
# frozen_string_literal: true module Totoro class EnqueueService def initialize(connection, config) @connection = connection @config = config end def enqueue(id, payload, attrs = {}) @connection.start unless @connection.connected? queue = channel.queue(*@config.queue(id)) payload = JSON.dump payload exchange.publish(payload, options(id, queue.name, attrs)) Rails.logger.debug "send message to #{queue.name}" STDOUT.flush channel.close rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::NetworkErrorWrapper, Bunny::ChannelAlreadyClosed, Bunny::ConnectionAlreadyClosed, AMQ::Protocol::EmptyResponseError => error @channel.close if @channel.present? raise(Totoro::ConnectionBreakError, "type: #{error.class}, message: #{error.message}") end private def options(queue_id, queue_name, attrs) { persistent: @config.queue_persistent?(queue_id), routing_key: queue_name }.merge(attrs) end def channel @channel ||= @connection.create_channel end # default exchange is a direct exchange def exchange @exchange ||= channel.default_exchange end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
totoro-1.0.7 | lib/totoro/services/enqueue_service.rb |
totoro-1.0.6 | lib/totoro/services/enqueue_service.rb |