Sha256: 4caeee963e27dbf9bed2a2b5e8227bfe59b3e72d4fd69011f24d98755a00d091

Contents?: true

Size: 1.59 KB

Versions: 2

Compression:

Stored size: 1.59 KB

Contents

require 'aws-sdk'

module Alephant
  class Sequencer
    attr_reader :id, :table_name, :table_conf

    def table_conf_defaults
      {
        :write_units => 5,
        :read_units => 10,
        :schema => {
          :hash_key => {
            :key => :string,
            :value => :string
          }
        }
      }
    end

    def initialize(opts, id)
      dynamo_db = AWS::DynamoDB.new

      @id = id
      @table_name = opts[:table_name]
      @table_conf = opts[:table_conf] || table_conf_defaults
      @table = dynamo_db.tables[@table_name]

      begin
        sleep_until_table_active
      rescue AWS::DynamoDB::Errors::ResourceNotFoundException
        @table = dynamo_db.tables.create(
          @table_name,
          @table_conf[:read_units],
          @table_conf[:write_units],
          @table_conf[:schema]
        )

        sleep_until_table_active
      end
    end

    def sequential?(data)
      if block_given?
        yield(get_last_seen, data)
      else
        get_last_seen < data["sequence_id"].to_i
      end
    end

    def set_last_seen(data)
      last_seen_id = block_given? ? yield(data) : data["sequence_id"]

      batch = AWS::DynamoDB::BatchWrite.new
      batch.put(@table_name, [:key => @id,:value => last_seen_id])
      batch.process!
    end

    def get_last_seen
      begin
        @table.batch_get(
          ['value'],
          [@id],
          {
            :consistent_read => true
          }
        ).first["value"].to_i
      rescue
        0
      end
    end

    def sleep_until_table_active
      sleep 1 until @table.status == :active
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
alephant-0.0.6-java lib/alephant/models/sequencer.rb
alephant-0.0.5-java lib/alephant/models/sequencer.rb