Sha256: 2edc95f2cd5d1ab6ff94a376a3d0de06c8e199f0ad59b784bf005cc1b6a39b91

Contents?: true

Size: 1.63 KB

Versions: 1

Compression:

Stored size: 1.63 KB

Contents

require 'jsonpath'

require 'alephant/logger'

module Alephant
  module Sequencer
    class Sequencer
      include Logger
      attr_reader :ident, :jsonpath, :keep_all

      def initialize(sequence_table, id, sequence_path, keep_all = true)
        @sequence_table = sequence_table

        @keep_all = keep_all
        @exists   = exists?
        @jsonpath = sequence_path
        @ident    = id
      end

      def sequential?(msg)
        (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath)
      end

      def exists?
        @exists || @sequence_table.sequence_exists(ident)
      end

      def validate(msg, &block)
        last_seen_id = get_last_seen
        sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath))

        block.call(msg) if (sequential || keep_all)

        if sequential
          set_last_seen(msg, last_seen_id)
        else
          logger.info("Sequencer#sequence nonsequential message for #{ident}")
        end
      end

      def delete!
        logger.info("Sequencer#delete!: #{ident}")
        @exists = false
        @sequence_table.delete_item!(ident)
      end

      def truncate!
        @sequence_table.truncate!
      end

      def set_last_seen(msg, last_seen_check = nil)
        seen_id = Sequencer.sequence_id_from(msg, jsonpath)

        @sequence_table.set_sequence_for(
          ident, seen_id,
          (exists? ? last_seen_check : nil)
        )
      end

      def get_last_seen(key = ident)
        @sequence_table.sequence_for(key)
      end

      def self.sequence_id_from(msg, path)
        JsonPath.on(msg.body, path).first.to_i
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
alephant-sequencer-1.0.0 lib/alephant/sequencer/sequencer.rb