Sha256: 03e3388e57f1841bdd85d88b246f7e654563dc3081ac72359bd60d459a6f7e95

Contents?: true

Size: 1.46 KB

Versions: 2

Compression:

Stored size: 1.46 KB

Contents

require 'jsonpath'

module Alephant
  module Sequencer
    @@sequence_tables = {}

    def self.create(table_name, ident, jsonpath = nil)
      @@sequence_tables[table_name] ||= SequenceTable.new(table_name)
      Sequencer.new(@@sequence_tables[table_name], ident, jsonpath)
    end

    class Sequencer
      attr_reader :ident, :jsonpath

      def initialize(sequence_table, id, sequence_path = nil)
        @mutex = Mutex.new
        @sequence_table = sequence_table
        @jsonpath = sequence_path
        @ident = id

        ::Alephant.logger.info("Sequencer.initialize: with id #{@ident}")
        @sequence_table.create
      end

      def sequential?(data)
        get_last_seen < sequence_id_from(data)
      end

      def delete!
        @sequence_table.delete_item!(ident)
      end

      def set_last_seen(data)
        last_seen_id = sequence_id_from(data)

        @sequence_table.set_sequence_for(ident, last_seen_id)
        ::Alephant.logger.info("Sequencer.set_last_seen: #{ident}:#{last_seen_id}")
      end

      def get_last_seen
        @sequence_table.sequence_for(ident)
      end

      private
      def sequence_id_from(data)
        jsonpath.nil? ?
          default_sequence_id_for(data) :
          sequence_from_jsonpath_for(data)
      end

      def sequence_from_jsonpath_for(data)
        JsonPath.on(data.body, jsonpath).first
      end

      def default_sequence_id_for(data)
        data.body['sequence_id'].to_i
      end

    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
alephant-0.0.9.9.1-java lib/alephant/models/sequencer.rb
alephant-0.0.9.9-java lib/alephant/models/sequencer.rb