lib/alephant/publisher/writer.rb in alephant-publisher-0.1.4 vs lib/alephant/publisher/writer.rb in alephant-publisher-0.1.5
- old
+ new
@@ -1,77 +1,110 @@
+require 'peach'
+require 'crimp'
+
require 'alephant/cache'
require 'alephant/views'
require 'alephant/renderer'
require 'alephant/lookup'
-
-require 'alephant/publisher/write_operation'
+require 'alephant/logger'
+require 'alephant/sequencer'
+require 'alephant/support/parser'
require 'alephant/publisher/render_mapper'
module Alephant
module Publisher
class Writer
- attr_reader :mapper, :cache
+ include ::Alephant::Logger
+ attr_reader :config, :message, :cache, :parser, :mapper
- def initialize(opts)
+ def initialize(config, message)
+ @config = config
+ @message = message
+
@cache = Cache.new(
- opts[:s3_bucket_id],
- opts[:s3_object_path]
+ config[:s3_bucket_id],
+ config[:s3_object_path]
)
+ @parser = Support::Parser.new(
+ config[:msg_vary_id_path]
+ )
+
@mapper = RenderMapper.new(
- opts[:renderer_id],
- opts[:view_path]
+ config[:renderer_id],
+ config[:view_path]
)
+ end
- @lookup_table_name = opts[:lookup_table_name]
+ def run!
+ batch? ? batch.sequence(message, &perform) : perform.call
+ end
- @renderer_id = opts[:renderer_id]
+ private
- @write_opts = {
- :sequencer_opts => {
- :table_name => opts[:sequencer_table_name],
- :id_path => opts[:sequence_id_path]
- },
- :msg_vary_path => opts[:msg_vary_id_path],
- :renderer_id => opts[:renderer_id]
- }
+ def perform
+ Proc.new { renders.peach { |id, r| write(id, r) } }
end
- def write(msg)
- write_op = WriteOperation.new(msg, @write_opts)
-
- write_op.batch_sequencer.sequence(msg) do |msg|
- mapper.generate(write_op.data).each do |component_id, renderer|
- write_component(write_op, component_id, renderer)
+ def write(id, r)
+ begin
+ seq_for(id).sequence(message) do
+ store(id, r.render, location_for(id))
end
+ rescue Exception => e
+ logger.warn "Alephant::Publisher::Writer#write: #{e.message}\n#{e.backtrace.join('\n')}"
+
+ raise e
end
end
- private
+ def store(id, content, location)
+ cache.put(location, content)
+ lookup.write(id, options, seq_id, location)
+ end
- def write_component(write_op, component_id, renderer)
- location = location_for(component_id, write_op.options, write_op.version)
- component_sequencer = write_op.sequencer_for(component_id, write_op.options)
+ def location_for(id)
+ "#{config[:renderer_id]}/#{id}/#{opt_hash}/#{seq_id}"
+ end
- component_sequencer.sequence(write_op.msg) do |msg|
- store(component_id, renderer.render, write_op.options, location)
- end
+ def batch
+ @batch ||= (renders.count > 1) ? seq_for(config[:renderer_id]) : nil
end
- def store(component_id, content, options, location)
- cache.put(location, content)
- lookup_for(component_id).write(options, location)
+ def batch?
+ !batch.nil?
end
- def lookup_for(id)
- Lookup.create(@lookup_table_name, id)
+ def seq_for(id)
+ Sequencer.create(config[:sequencer_table_name], seq_key_from(id), config[:sequence_id_path])
end
- def location_for(component_id, options, version = nil)
- options_hash = Crimp.signature(options)
- base_name = "#{@renderer_id}/#{component_id}/#{options_hash}"
- version ? "#{base_name}/#{version}" : base_name
+ def seq_key_from(id)
+ "#{id}/#{opt_hash}"
end
+ def seq_id
+ @seq_id ||= Sequencer::Sequencer.sequence_id_from(message, config[:sequence_id_path])
+ end
+
+ def renders
+ @renders ||= mapper.generate(data)
+ end
+
+ def opt_hash
+ @opt_hash ||= Crimp.signature(options)
+ end
+
+ def options
+ @options ||= data[:options]
+ end
+
+ def data
+ @data ||= parser.parse(message)
+ end
+
+ def lookup
+ Lookup.create(config[:lookup_table_name])
+ end
end
end
end