Sha256: 4e6f5c887e16d13a2a7c8a2d2033b4e7a8f6c2406a1f59b0b24b6ba858a3b85c
Contents?: true
Size: 1.64 KB
Versions: 1
Compression:
Stored size: 1.64 KB
Contents
require_relative 'env' require 'alephant/support' require 'alephant/sequencer' require 'alephant/cache' require 'alephant/logger' require 'alephant/views' require 'alephant/renderer' require 'alephant/lookup' require "alephant/publisher/version" require 'alephant/publisher/models/render_mapper' require 'alephant/publisher/models/writer' require 'alephant/publisher/models/queue' module Alephant module Publisher include ::Alephant::Logger def self.create(opts = {}, logger = nil) Publisher.new(opts, logger) end private class Publisher attr_reader :sequencer, :queue, :writer, :parser def initialize(opts, logger) ::Alephant::Logger.set_logger(logger) unless logger.nil? @parser = Support::Parser.new( opts[:msg_vary_id_path] ) @sequencer = Sequencer.create( opts[:sequencer_table_name], opts[:sqs_queue_url], opts[:sequence_id_path] ) @queue = Queue.new( opts[:sqs_queue_url] ) @writer = Writer.new( opts.select do |k,v| [ :renderer_id, :s3_bucket_id, :s3_object_path, :view_path, :lookup_table_name ].include? k end ) end def run! Thread.new do @queue.poll { |msg| receive(msg) } end end def receive(msg) write msg if sequencer.sequential?(msg) end private def write(msg) writer.write(parser.parse(msg), sequencer.sequence_id_from(msg)) sequencer.set_last_seen(msg) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
alephant-publisher-0.0.1 | lib/alephant/publisher.rb |