Sha256: 285cd8a3b1d59c0fbfa4a7da6c941b357f994647b100e6780679d7c8c7a54704
Contents?: true
Size: 1.94 KB
Versions: 1
Compression:
Stored size: 1.94 KB
Contents
# frozen_string_literal: true require_relative "event_publisher/version" require "json" require "bunny" module Artsy module EventPublisher class Error < StandardError; end def self.publish(topic, routing_key = "", subject: nil, verb: nil, object: nil, properties: nil) return unless Config.enabled data = { subject: subject, verb: verb, object: object, properties: properties }.compact payload = data.to_json Connection.publish(topic: topic, routing_key: routing_key, payload: payload) Config.logger&.debug("[event] #{payload}") data end def self.configure yield Config if block_given? Config end module Config extend self attr_accessor :app_id attr_accessor :enabled attr_accessor :rabbitmq_url attr_accessor :logger end class Connection OPTIONS = { persistent: true, content_type: "application/json", headers: {} } @connection = nil @mutex = Mutex.new def self.publish(topic:, routing_key:, payload:) with_channel do |channel| options = OPTIONS.merge(routing_key: routing_key, app_id: Config.app_id) channel.topic(topic, durable: true).publish(payload, options) raise Error, "Publishing failed" unless channel.wait_for_confirms end end def self.with_channel channel = get_connection.create_channel channel.confirm_select yield channel if block_given? ensure channel.close if channel&.open? end # Synchronized access to the connection def self.get_connection @mutex.synchronize do @connection ||= build_connection @connection = build_connection if @connection.closed? @connection end end def self.build_connection Bunny.new(Config.rabbitmq_url).tap(&:start) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
artsy-event_publisher-0.1.0 | lib/artsy/event_publisher.rb |