Sha256: 3cbe34d649b69dc1b060dda274a1f9e2ca01996485efc720c636d6897f3d159d

Contents?: true

Size: 1.65 KB

Versions: 4

Compression:

Stored size: 1.65 KB

Contents

require_relative 'env'

require 'alephant/support/parser'
require 'alephant/sequencer'
require 'alephant/cache'
require 'alephant/logger'
require 'alephant/views'
require 'alephant/renderer'
require 'alephant/lookup'

require "alephant/publisher/version"
require 'alephant/publisher/models/render_mapper'
require 'alephant/publisher/models/writer'
require 'alephant/publisher/models/queue'

module Alephant
  module Publisher
    include ::Alephant::Logger

    def self.create(opts = {}, logger = nil)
      Publisher.new(opts, logger)
    end

    private

    class Publisher
      attr_reader :sequencer, :queue, :writer, :parser

      def initialize(opts, logger)
        ::Alephant::Logger.set_logger(logger) unless logger.nil?

        @parser = Support::Parser.new(
          opts[:msg_vary_id_path]
        )

        @sequencer = Sequencer.create(
          opts[:sequencer_table_name],
          opts[:sqs_queue_url],
          opts[:sequence_id_path]
        )

        @queue = Queue.new(
          opts[:sqs_queue_url]
        )

        @writer = Writer.new(
          opts.select do |k,v|
            [
              :renderer_id,
              :s3_bucket_id,
              :s3_object_path,
              :view_path,
              :lookup_table_name
            ].include? k
          end
        )
      end

      def run!
        Thread.new do
          @queue.poll { |msg| receive(msg) }
        end
      end

      def receive(msg)
        write msg if sequencer.sequential?(msg)
      end

      private

      def write(msg)
        writer.write(parser.parse(msg), sequencer.sequence_id_from(msg))
        sequencer.set_last_seen(msg)
      end

    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
alephant-publisher-0.1.0 lib/alephant/publisher.rb
alephant-publisher-0.0.4 lib/alephant/publisher.rb
alephant-publisher-0.0.3 lib/alephant/publisher.rb
alephant-publisher-0.0.2 lib/alephant/publisher.rb