lib/propono/services/publisher.rb in propono-0.8.2 vs lib/propono/services/publisher.rb in propono-0.9.0
- old
+ new
@@ -9,53 +9,63 @@
def self.publish(topic_id, message, options = {})
new(topic_id, message, options).publish
end
- attr_reader :topic_id, :message, :protocol
+ attr_reader :topic_id, :message, :protocol, :id
def initialize(topic_id, message, options = {})
raise PublisherError.new("Topic is nil") if topic_id.nil?
raise PublisherError.new("Message is nil") if message.nil?
+ options = options.symbolize_keys
+
@topic_id = topic_id
@message = message
@protocol = options.fetch(:protocol, :sns).to_sym
+ @id = SecureRandom.hex(3)
+ @id = "#{options[:id]}-#{@id}" if options[:id]
end
def publish
- Propono.config.logger.info "Propono: Publishing #{message} to #{topic_id} via #{protocol}"
+ Propono.config.logger.info "Propono [#{id}]: Publishing #{message} to #{topic_id} via #{protocol}"
send("publish_via_#{protocol}")
end
private
def publish_via_sns
topic = TopicCreator.find_or_create(topic_id)
- msg = message.is_a?(String) ? message : message.to_json
Thread.new do
begin
- sns.publish(topic.arn, msg)
+ sns.publish(topic.arn, body.to_json)
rescue => e
- Propono.config.logger.error "Propono failed to send via sns : #{e}"
+ Propono.config.logger.error "Propono [#{id}]: Failed to send via sns : #{e}"
end
end
end
def publish_via_udp
- payload = {topic: topic_id, message: message}.to_json
+ payload = body.merge(topic: topic_id).to_json
UDPSocket.new.send(payload, 0, Propono.config.udp_host, Propono.config.udp_port)
rescue => e
- Propono.config.logger.error "Propono failed to send : #{e}"
+ Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}"
end
def publish_via_tcp
- payload = {topic: topic_id, message: message}.to_json
+ payload = body.merge(topic: topic_id).to_json
socket = TCPSocket.new(Propono.config.tcp_host, Propono.config.tcp_port)
socket.write payload
socket.close
rescue => e
- Propono.config.logger.error "Propono failed to send : #{e}"
+ Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}"
+ end
+
+ def body
+ {
+ id: id,
+ message: message
+ }
end
end
end