lib/alephant/sequencer/sequencer.rb in alephant-sequencer-0.0.8 vs lib/alephant/sequencer/sequencer.rb in alephant-sequencer-0.1.0
- old
+ new
@@ -3,20 +3,21 @@
require 'alephant/logger'
module Alephant
module Sequencer
class Sequencer
- include ::Alephant::Logger
- attr_reader :ident, :jsonpath
+ include Logger
+ attr_reader :ident, :jsonpath, :keep_all
- def initialize(sequence_table, id, sequence_path)
+ def initialize(sequence_table, id, sequence_path, keep_all = true)
@sequence_table = sequence_table
@sequence_table.create
- @exists = exists?
+ @keep_all = keep_all
+ @exists = exists?
@jsonpath = sequence_path
- @ident = id
+ @ident = id
end
def sequential?(msg)
(get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath)
end
@@ -24,13 +25,15 @@
def exists?
@exists || @sequence_table.sequence_exists(ident)
end
def sequence(msg, &block)
- block.call(msg)
-
last_seen_id = get_last_seen
- if (last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath)
+ sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath))
+
+ block.call(msg) if (sequential || keep_all)
+
+ if sequential
set_last_seen(msg, last_seen_id)
else
logger.info("Sequencer#sequence nonsequential message for #{ident}")
end
end