lib/alephant/sequencer/sequencer.rb in alephant-sequencer-0.0.8 vs lib/alephant/sequencer/sequencer.rb in alephant-sequencer-0.1.0

- old
+ new

@@ -3,20 +3,21 @@ require 'alephant/logger' module Alephant module Sequencer class Sequencer - include ::Alephant::Logger - attr_reader :ident, :jsonpath + include Logger + attr_reader :ident, :jsonpath, :keep_all - def initialize(sequence_table, id, sequence_path) + def initialize(sequence_table, id, sequence_path, keep_all = true) @sequence_table = sequence_table @sequence_table.create - @exists = exists? + @keep_all = keep_all + @exists = exists? @jsonpath = sequence_path - @ident = id + @ident = id end def sequential?(msg) (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath) end @@ -24,13 +25,15 @@ def exists? @exists || @sequence_table.sequence_exists(ident) end def sequence(msg, &block) - block.call(msg) - last_seen_id = get_last_seen - if (last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath) + sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath)) + + block.call(msg) if (sequential || keep_all) + + if sequential set_last_seen(msg, last_seen_id) else logger.info("Sequencer#sequence nonsequential message for #{ident}") end end