lib/propono/services/publisher.rb in propono-1.7.0 vs lib/propono/services/publisher.rb in propono-2.0.0.rc1

- old
+ new

@@ -3,77 +3,54 @@ module Propono class PublisherError < ProponoError end class Publisher - include Sns - - def self.publish(topic_id, message, options = {}) - new(topic_id, message, options).publish + def self.publish(*args) + new(*args).publish end - attr_reader :topic_id, :message, :protocol, :id, :async + attr_reader :aws_client, :propono_config, :topic_name, :message, :id, :async - def initialize(topic_id, message, options = {}) - raise PublisherError.new("Topic is nil") if topic_id.nil? + def initialize(aws_client, propono_config, topic_name, message, async: false, id: nil) + raise PublisherError.new("Topic is nil") if topic_name.nil? raise PublisherError.new("Message is nil") if message.nil? - options = Propono::Utils.symbolize_keys options - - @topic_id = topic_id + @aws_client = aws_client + @propono_config = propono_config + @topic_name = topic_name @message = message - @protocol = options.fetch(:protocol, :sns).to_sym - @id = SecureRandom.hex(3) - @id = "#{options[:id]}-#{@id}" if options[:id] - @async = options.fetch(:async, true) + @async = async + + random_id = SecureRandom.hex(3) + @id = id ? "#{id}-#{random_id}" : random_id end def publish - Propono.config.logger.info "Propono [#{id}]: Publishing #{message} to #{topic_id} via #{protocol}" - send("publish_via_#{protocol}") + propono_config.logger.info "Propono [#{id}]: Publishing #{message} to #{topic_name}" + async ? publish_asyncronously : publish_syncronously end private - def publish_via_sns - async ? publish_via_sns_asyncronously : publish_via_sns_syncronously + def publish_asyncronously + Thread.new { publish_syncronously } end - def publish_via_sns_asyncronously - Thread.new { publish_via_sns_syncronously } - end - - def publish_via_sns_syncronously + def publish_syncronously begin - topic = TopicCreator.find_or_create(topic_id) + topic = aws_client.create_topic(topic_name) rescue => e - Propono.config.logger.error "Propono [#{id}]: Failed to create topic #{topic_id}: #{e}" + propono_config.logger.error "Propono [#{id}]: Failed to get or create topic #{topic_name}: #{e}" raise end begin - sns.publish(topic.arn, body.to_json) + aws_client.publish_to_sns(topic, body) rescue => e - Propono.config.logger.error "Propono [#{id}]: Failed to send via sns: #{e}" + propono_config.logger.error "Propono [#{id}]: Failed to send via sns: #{e}" raise end - end - - def publish_via_udp - payload = body.merge(topic: topic_id).to_json - UDPSocket.new.send(payload, 0, Propono.config.udp_host, Propono.config.udp_port) - rescue => e - Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}" - end - - def publish_via_tcp - payload = body.merge(topic: topic_id).to_json - - socket = TCPSocket.new(Propono.config.tcp_host, Propono.config.tcp_port) - socket.write payload - socket.close - rescue => e - Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}" end def body { id: id,