Sha256: 7d44fa460339d418fb6ddd4c72b06e911cccd14d2ae2781a4a11df9c8abc5bc7

Contents?: true

Size: 1.98 KB

Versions: 5

Compression:

Stored size: 1.98 KB

Contents

require 'aws-sdk'

require_relative 'env'

require 'alephant/models/logger'
require 'alephant/models/queue'
require 'alephant/models/cache'
require 'alephant/models/renderer'
require 'alephant/models/multi_renderer'
require 'alephant/models/sequencer'
require 'alephant/models/parser'

require 'alephant/errors'
require 'alephant/views'

module Alephant
  class Alephant
    attr_reader :sequencer, :queue, :cache, :renderer

    VALID_OPTS = [
      :s3_bucket_id,
      :s3_object_path,
      :s3_object_id,
      :table_name,
      :sqs_queue_id,
      :view_path,
      :sequential_proc,
      :set_last_seen_proc,
      :component_id
    ]

    def initialize(opts = {}, logger = nil)
      set_logger(logger)
      set_opts(opts)

      @logger = ::Alephant.logger
      @sequencer = Sequencer.new(
        {
          :table_name => @table_name
        },
        @sqs_queue_id
      )

      @queue = Queue.new(@sqs_queue_id)
      @cache = Cache.new(@s3_bucket_id, @s3_object_path)
      @multi_renderer = MultiRenderer.new(@component_id, @view_path)
      @parser = Parser.new
    end

    def set_logger(logger)
      ::Alephant.logger = logger
    end

    def write(data)
      @multi_renderer.render(data).each do |id, item|
        @cache.put(id, item)
      end
    end

    def receive(msg)
      data = @parser.parse msg.body

      @logger.info("Alephant.receive: with id #{msg.id} and body digest: #{msg.md5}")

      if @sequencer.sequential?(data, &@sequential_proc)
        write data
        @sequencer.set_last_seen(data, &@set_last_seen_proc)
      else
        @logger.warn("Alephant.receive: out of sequence message received #{msg.id} (discarded)")
      end
    end

    def run!
      Thread.new do
        @queue.poll { |msg| receive(msg) }
      end
    end

    private
    def set_opts(opts)
      VALID_OPTS.each do | k |
        v = opts.has_key?(k) ? opts[k] : nil
        singleton_class.class_eval do
          attr_accessor k
        end
        send("#{k}=", v)
      end
    end

  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
alephant-0.0.9.6-java lib/alephant.rb
alephant-0.0.9.5-java lib/alephant.rb
alephant-0.0.9.4-java lib/alephant.rb
alephant-0.0.9.3-java lib/alephant.rb
alephant-0.0.9.2-java lib/alephant.rb