Sha256: 71b73c740cf5634894175b9b59e9c63f893e675f893d9f6f370ccf6e88ce11d6
Contents?: true
Size: 1.54 KB
Versions: 1
Compression:
Stored size: 1.54 KB
Contents
require 'jsonpath' require 'alephant/logger' module Alephant module Sequencer class Sequencer include ::Alephant::Logger attr_reader :ident, :jsonpath def initialize(sequence_table, id, sequence_path) @sequence_table = sequence_table @sequence_table.create @exists = exists? @jsonpath = sequence_path @ident = id end def sequential?(msg) (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath) end def exists? @exists || @sequence_table.sequence_exists(ident) end def sequence(msg, &block) block.call(msg) last_seen_id = get_last_seen if (last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath) set_last_seen(msg, last_seen_id) else logger.info("Sequencer#sequence nonsequential message for #{ident}") end end def delete! logger.info("Sequencer#delete!: #{ident}") @exists = false @sequence_table.delete_item!(ident) end def truncate! @sequence_table.truncate! end def set_last_seen(msg, last_seen_check = nil) seen_id = Sequencer.sequence_id_from(msg, jsonpath) @sequence_table.set_sequence_for( ident, seen_id, (exists? ? last_seen_check : nil) ) end def get_last_seen @sequence_table.sequence_for(ident) end def self.sequence_id_from(msg, path) JsonPath.on(msg.body, path).first.to_i end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
alephant-sequencer-0.0.8 | lib/alephant/sequencer/sequencer.rb |