Sha256: a82bf006ebd0849f8c5f10bdb1b5a8774d04b1ca5fd5b5a9bd0a197ff24558af

Contents?: true

Size: 1.25 KB

Versions: 1

Compression:

Stored size: 1.25 KB

Contents

require 'jsonpath'
require 'alephant/logger'

module Alephant
  module Sequencer
    class Sequencer
      include ::Alephant::Logger
      attr_reader :ident, :jsonpath

      def initialize(sequence_table, id, sequence_path = nil)
        @mutex = Mutex.new
        @sequence_table = sequence_table
        @jsonpath = sequence_path
        @ident = id

        @sequence_table.create
      end

      def sequential?(data)
        get_last_seen < sequence_id_from(data)
      end

      def delete!
        logger.info("Sequencer.delete!: #{ident}")
        @sequence_table.delete_item!(ident)
      end

      def set_last_seen(data)
        last_seen_id = sequence_id_from(data)
        logger.info("Sequencer.set_last_seen: #{last_seen_id}")

        @sequence_table.set_sequence_for(ident, last_seen_id)
      end

      def get_last_seen
        @sequence_table.sequence_for(ident)
      end

      private
      def sequence_id_from(data)
        jsonpath.nil? ?
          default_sequence_id_for(data) :
          sequence_from_jsonpath_for(data)
      end

      def sequence_from_jsonpath_for(data)
        JsonPath.on(data.body, jsonpath).first
      end

      def default_sequence_id_for(data)
        data.body['sequence_id'].to_i
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
alephant-sequencer-0.0.3 lib/alephant/sequencer/sequencer.rb