lib/alephant/publisher/writer.rb in alephant-publisher-0.1.4 vs lib/alephant/publisher/writer.rb in alephant-publisher-0.1.5

- old
+ new

@@ -1,77 +1,110 @@ +require 'peach' +require 'crimp' + require 'alephant/cache' require 'alephant/views' require 'alephant/renderer' require 'alephant/lookup' - -require 'alephant/publisher/write_operation' +require 'alephant/logger' +require 'alephant/sequencer' +require 'alephant/support/parser' require 'alephant/publisher/render_mapper' module Alephant module Publisher class Writer - attr_reader :mapper, :cache + include ::Alephant::Logger + attr_reader :config, :message, :cache, :parser, :mapper - def initialize(opts) + def initialize(config, message) + @config = config + @message = message + @cache = Cache.new( - opts[:s3_bucket_id], - opts[:s3_object_path] + config[:s3_bucket_id], + config[:s3_object_path] ) + @parser = Support::Parser.new( + config[:msg_vary_id_path] + ) + @mapper = RenderMapper.new( - opts[:renderer_id], - opts[:view_path] + config[:renderer_id], + config[:view_path] ) + end - @lookup_table_name = opts[:lookup_table_name] + def run! + batch? ? batch.sequence(message, &perform) : perform.call + end - @renderer_id = opts[:renderer_id] + private - @write_opts = { - :sequencer_opts => { - :table_name => opts[:sequencer_table_name], - :id_path => opts[:sequence_id_path] - }, - :msg_vary_path => opts[:msg_vary_id_path], - :renderer_id => opts[:renderer_id] - } + def perform + Proc.new { renders.peach { |id, r| write(id, r) } } end - def write(msg) - write_op = WriteOperation.new(msg, @write_opts) - - write_op.batch_sequencer.sequence(msg) do |msg| - mapper.generate(write_op.data).each do |component_id, renderer| - write_component(write_op, component_id, renderer) + def write(id, r) + begin + seq_for(id).sequence(message) do + store(id, r.render, location_for(id)) end + rescue Exception => e + logger.warn "Alephant::Publisher::Writer#write: #{e.message}\n#{e.backtrace.join('\n')}" + + raise e end end - private + def store(id, content, location) + cache.put(location, content) + lookup.write(id, options, seq_id, location) + end - def write_component(write_op, component_id, renderer) - location = location_for(component_id, write_op.options, write_op.version) - component_sequencer = write_op.sequencer_for(component_id, write_op.options) + def location_for(id) + "#{config[:renderer_id]}/#{id}/#{opt_hash}/#{seq_id}" + end - component_sequencer.sequence(write_op.msg) do |msg| - store(component_id, renderer.render, write_op.options, location) - end + def batch + @batch ||= (renders.count > 1) ? seq_for(config[:renderer_id]) : nil end - def store(component_id, content, options, location) - cache.put(location, content) - lookup_for(component_id).write(options, location) + def batch? + !batch.nil? end - def lookup_for(id) - Lookup.create(@lookup_table_name, id) + def seq_for(id) + Sequencer.create(config[:sequencer_table_name], seq_key_from(id), config[:sequence_id_path]) end - def location_for(component_id, options, version = nil) - options_hash = Crimp.signature(options) - base_name = "#{@renderer_id}/#{component_id}/#{options_hash}" - version ? "#{base_name}/#{version}" : base_name + def seq_key_from(id) + "#{id}/#{opt_hash}" end + def seq_id + @seq_id ||= Sequencer::Sequencer.sequence_id_from(message, config[:sequence_id_path]) + end + + def renders + @renders ||= mapper.generate(data) + end + + def opt_hash + @opt_hash ||= Crimp.signature(options) + end + + def options + @options ||= data[:options] + end + + def data + @data ||= parser.parse(message) + end + + def lookup + Lookup.create(config[:lookup_table_name]) + end end end end