lib/alephant/publisher/queue/sqs_helper/archiver.rb in alephant-publisher-queue-2.0.3 vs lib/alephant/publisher/queue/sqs_helper/archiver.rb in alephant-publisher-queue-2.1.0
- old
+ new
@@ -7,16 +7,17 @@
module Queue
module SQSHelper
class Archiver
include Logger
- attr_reader :cache, :async, :log_message_body
+ attr_reader :cache, :async, :log_message_body, :log_validator
def initialize(cache, opts)
@cache = cache
@async = opts[:async_store]
@log_message_body = opts[:log_archive_message]
+ @log_validator = opts[:log_validator] || -> _ { true }
end
def see(message)
return if message.nil?
message.tap do |m|
@@ -43,24 +44,11 @@
store_item(message).tap do
logger.info(
"event" => "MessageStored",
"messageBody" => msg_body,
"method" => "#{self.class}#store"
- ) if newsbeat_uri? msg_body
+ ) if log_validator.(msg_body)
end
- end
-
- def newsbeat_uri?(msg_body)
- jsb = ::JSON.parse(msg_body)
- msg = ::JSON.parse(jsb["Message"]) if jsb["Message"]
-
- true if verify_message_content msg
- rescue ::JSON::ParserError
- false
- end
-
- def verify_message_content(msg)
- !msg.nil? && msg["uri"] && msg["uri"] =~ /content\/asset\/newsbeat/
end
def store_item(message)
cache.put(
cache_key(message.id),