lib/propono/services/publisher.rb in propono-0.8.2 vs lib/propono/services/publisher.rb in propono-0.9.0

- old
+ new

@@ -9,53 +9,63 @@ def self.publish(topic_id, message, options = {}) new(topic_id, message, options).publish end - attr_reader :topic_id, :message, :protocol + attr_reader :topic_id, :message, :protocol, :id def initialize(topic_id, message, options = {}) raise PublisherError.new("Topic is nil") if topic_id.nil? raise PublisherError.new("Message is nil") if message.nil? + options = options.symbolize_keys + @topic_id = topic_id @message = message @protocol = options.fetch(:protocol, :sns).to_sym + @id = SecureRandom.hex(3) + @id = "#{options[:id]}-#{@id}" if options[:id] end def publish - Propono.config.logger.info "Propono: Publishing #{message} to #{topic_id} via #{protocol}" + Propono.config.logger.info "Propono [#{id}]: Publishing #{message} to #{topic_id} via #{protocol}" send("publish_via_#{protocol}") end private def publish_via_sns topic = TopicCreator.find_or_create(topic_id) - msg = message.is_a?(String) ? message : message.to_json Thread.new do begin - sns.publish(topic.arn, msg) + sns.publish(topic.arn, body.to_json) rescue => e - Propono.config.logger.error "Propono failed to send via sns : #{e}" + Propono.config.logger.error "Propono [#{id}]: Failed to send via sns : #{e}" end end end def publish_via_udp - payload = {topic: topic_id, message: message}.to_json + payload = body.merge(topic: topic_id).to_json UDPSocket.new.send(payload, 0, Propono.config.udp_host, Propono.config.udp_port) rescue => e - Propono.config.logger.error "Propono failed to send : #{e}" + Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}" end def publish_via_tcp - payload = {topic: topic_id, message: message}.to_json + payload = body.merge(topic: topic_id).to_json socket = TCPSocket.new(Propono.config.tcp_host, Propono.config.tcp_port) socket.write payload socket.close rescue => e - Propono.config.logger.error "Propono failed to send : #{e}" + Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}" + end + + def body + { + id: id, + message: message + } end end end