Sha256: 36b2052d86a393205ec7f0e0617d1bce9f2673f72e18175814eccf9e6558ed40

Contents?: true

Size: 1.21 KB

Versions: 2

Compression:

Stored size: 1.21 KB

Contents

# frozen_string_literal: true

module Totoro
  class EnqueueService
    def initialize(connection, config)
      @connection = connection
      @config = config
    end

    def enqueue(id, payload, attrs = {})
      @connection.start unless @connection.connected?
      queue = channel.queue(*@config.queue(id))
      payload = JSON.dump payload
      exchange.publish(payload, options(id, queue.name, attrs))
      Rails.logger.debug "send message to #{queue.name}"
      STDOUT.flush
      channel.close
    rescue Bunny::TCPConnectionFailedForAllHosts,
           Bunny::NetworkErrorWrapper,
           Bunny::ChannelAlreadyClosed,
           Bunny::ConnectionAlreadyClosed,
           AMQ::Protocol::EmptyResponseError => error
      @channel.close if @channel.present?
      raise(Totoro::ConnectionBreakError, "type: #{error.class}, message: #{error.message}")
    end

    private

    def options(queue_id, queue_name, attrs)
      { persistent: @config.queue_persistent?(queue_id), routing_key: queue_name }.merge(attrs)
    end

    def channel
      @channel ||= @connection.create_channel
    end

    # default exchange is a direct exchange
    def exchange
      @exchange ||= channel.default_exchange
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
totoro-1.0.7 lib/totoro/services/enqueue_service.rb
totoro-1.0.6 lib/totoro/services/enqueue_service.rb