lib/propono/services/publisher.rb in propono-1.7.0 vs lib/propono/services/publisher.rb in propono-2.0.0.rc1
- old
+ new
@@ -3,77 +3,54 @@
module Propono
class PublisherError < ProponoError
end
class Publisher
- include Sns
-
- def self.publish(topic_id, message, options = {})
- new(topic_id, message, options).publish
+ def self.publish(*args)
+ new(*args).publish
end
- attr_reader :topic_id, :message, :protocol, :id, :async
+ attr_reader :aws_client, :propono_config, :topic_name, :message, :id, :async
- def initialize(topic_id, message, options = {})
- raise PublisherError.new("Topic is nil") if topic_id.nil?
+ def initialize(aws_client, propono_config, topic_name, message, async: false, id: nil)
+ raise PublisherError.new("Topic is nil") if topic_name.nil?
raise PublisherError.new("Message is nil") if message.nil?
- options = Propono::Utils.symbolize_keys options
-
- @topic_id = topic_id
+ @aws_client = aws_client
+ @propono_config = propono_config
+ @topic_name = topic_name
@message = message
- @protocol = options.fetch(:protocol, :sns).to_sym
- @id = SecureRandom.hex(3)
- @id = "#{options[:id]}-#{@id}" if options[:id]
- @async = options.fetch(:async, true)
+ @async = async
+
+ random_id = SecureRandom.hex(3)
+ @id = id ? "#{id}-#{random_id}" : random_id
end
def publish
- Propono.config.logger.info "Propono [#{id}]: Publishing #{message} to #{topic_id} via #{protocol}"
- send("publish_via_#{protocol}")
+ propono_config.logger.info "Propono [#{id}]: Publishing #{message} to #{topic_name}"
+ async ? publish_asyncronously : publish_syncronously
end
private
- def publish_via_sns
- async ? publish_via_sns_asyncronously : publish_via_sns_syncronously
+ def publish_asyncronously
+ Thread.new { publish_syncronously }
end
- def publish_via_sns_asyncronously
- Thread.new { publish_via_sns_syncronously }
- end
-
- def publish_via_sns_syncronously
+ def publish_syncronously
begin
- topic = TopicCreator.find_or_create(topic_id)
+ topic = aws_client.create_topic(topic_name)
rescue => e
- Propono.config.logger.error "Propono [#{id}]: Failed to create topic #{topic_id}: #{e}"
+ propono_config.logger.error "Propono [#{id}]: Failed to get or create topic #{topic_name}: #{e}"
raise
end
begin
- sns.publish(topic.arn, body.to_json)
+ aws_client.publish_to_sns(topic, body)
rescue => e
- Propono.config.logger.error "Propono [#{id}]: Failed to send via sns: #{e}"
+ propono_config.logger.error "Propono [#{id}]: Failed to send via sns: #{e}"
raise
end
- end
-
- def publish_via_udp
- payload = body.merge(topic: topic_id).to_json
- UDPSocket.new.send(payload, 0, Propono.config.udp_host, Propono.config.udp_port)
- rescue => e
- Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}"
- end
-
- def publish_via_tcp
- payload = body.merge(topic: topic_id).to_json
-
- socket = TCPSocket.new(Propono.config.tcp_host, Propono.config.tcp_port)
- socket.write payload
- socket.close
- rescue => e
- Propono.config.logger.error "Propono [#{id}]: Failed to send : #{e}"
end
def body
{
id: id,