Sha256: 71b73c740cf5634894175b9b59e9c63f893e675f893d9f6f370ccf6e88ce11d6

Contents?: true

Size: 1.54 KB

Versions: 1

Compression:

Stored size: 1.54 KB

Contents

require 'jsonpath'

require 'alephant/logger'

module Alephant
  module Sequencer
    class Sequencer
      include ::Alephant::Logger
      attr_reader :ident, :jsonpath

      def initialize(sequence_table, id, sequence_path)
        @sequence_table = sequence_table
        @sequence_table.create

        @exists = exists?
        @jsonpath = sequence_path
        @ident = id
      end

      def sequential?(msg)
        (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath)
      end

      def exists?
        @exists || @sequence_table.sequence_exists(ident)
      end

      def sequence(msg, &block)
        block.call(msg)

        last_seen_id = get_last_seen
        if (last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath)
          set_last_seen(msg, last_seen_id)
        else
          logger.info("Sequencer#sequence nonsequential message for #{ident}")
        end
      end

      def delete!
        logger.info("Sequencer#delete!: #{ident}")
        @exists = false
        @sequence_table.delete_item!(ident)
      end

      def truncate!
        @sequence_table.truncate!
      end

      def set_last_seen(msg, last_seen_check = nil)
        seen_id = Sequencer.sequence_id_from(msg, jsonpath)

        @sequence_table.set_sequence_for(
          ident, seen_id,
          (exists? ? last_seen_check : nil)
        )
      end

      def get_last_seen
        @sequence_table.sequence_for(ident)
      end

      def self.sequence_id_from(msg, path)
        JsonPath.on(msg.body, path).first.to_i
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
alephant-sequencer-0.0.8 lib/alephant/sequencer/sequencer.rb