Sha256: 4e6f5c887e16d13a2a7c8a2d2033b4e7a8f6c2406a1f59b0b24b6ba858a3b85c

Contents?: true

Size: 1.64 KB

Versions: 1

Compression:

Stored size: 1.64 KB

Contents

require_relative 'env'

require 'alephant/support'
require 'alephant/sequencer'
require 'alephant/cache'
require 'alephant/logger'
require 'alephant/views'
require 'alephant/renderer'
require 'alephant/lookup'

require "alephant/publisher/version"
require 'alephant/publisher/models/render_mapper'
require 'alephant/publisher/models/writer'
require 'alephant/publisher/models/queue'

module Alephant
  module Publisher
    include ::Alephant::Logger

    def self.create(opts = {}, logger = nil)
      Publisher.new(opts, logger)
    end

    private

    class Publisher
      attr_reader :sequencer, :queue, :writer, :parser

      def initialize(opts, logger)
        ::Alephant::Logger.set_logger(logger) unless logger.nil?

        @parser = Support::Parser.new(
          opts[:msg_vary_id_path]
        )

        @sequencer = Sequencer.create(
          opts[:sequencer_table_name],
          opts[:sqs_queue_url],
          opts[:sequence_id_path]
        )

        @queue = Queue.new(
          opts[:sqs_queue_url]
        )

        @writer = Writer.new(
          opts.select do |k,v|
            [
              :renderer_id,
              :s3_bucket_id,
              :s3_object_path,
              :view_path,
              :lookup_table_name
            ].include? k
          end
        )
      end

      def run!
        Thread.new do
          @queue.poll { |msg| receive(msg) }
        end
      end

      def receive(msg)
        write msg if sequencer.sequential?(msg)
      end

      private

      def write(msg)
        writer.write(parser.parse(msg), sequencer.sequence_id_from(msg))
        sequencer.set_last_seen(msg)
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
alephant-publisher-0.0.1 lib/alephant/publisher.rb