lib/eventus/persistence/kyotocabinet.rb in eventus-0.2.0 vs lib/eventus/persistence/kyotocabinet.rb in eventus-0.3.0
- old
+ new
@@ -4,21 +4,25 @@
module Persistence
class KyotoCabinet
def initialize(options = {})
@db = ::KyotoCabinet::DB::new
- @db.open(options[:path], ::KyotoCabinet::DB::OCREATE)
- @serializer = options.fetch(:serializer) { Eventus::Serializers::Marshal }
+ @queue = ::KyotoCabinet::DB::new
+ @serializer = options.delete(:serializer) || Eventus::Serializers::Marshal
+ queue_con = build_connection(:path => options.delete(:queue_path) || '*')
+ con = build_connection(options)
+ raise Eventus::ConnectionError unless @db.open(con) && @queue.open(queue_con)
end
- def commit(id, start, events)
- pid = pack_hex(id)
+ def commit(events)
@db.transaction do
- events.each_with_index do |event, index|
- key = build_key(pid, start + index)
+ events.each do |event, index|
+ pid = pack_hex(event['sid'])
+ key = build_key(pid, event['sequence'])
value = @serializer.serialize(event)
raise Eventus::ConcurrencyError unless @db.add(key,value)
+ @queue.set(key, "")
end
end
end
def load(id, min = nil)
@@ -31,15 +35,49 @@
end
@db.get_bulk(keys, false).values.map { |obj| @serializer.deserialize(obj) }
end
+ def load_undispatched
+ events = []
+ @queue.each_key do |key|
+ value = @db.get(key[0])
+ next unless value
+ obj = @serializer.deserialize(value)
+ events << obj
+ end
+ events
+ end
+
+ def mark_dispatched(stream_id, sequence)
+ key = build_key(pack_hex(stream_id), sequence)
+ @queue.remove(key)
+ end
+
def pack_hex(id)
id.match(/^[0-9a-fA-F]+$/) ? [id].pack('H*') : id
end
def build_key(id, index)
id + ("_%07d" % index)
+ end
+
+ def close
+ @db.close
+ @queue.close
+ end
+
+ private
+
+ def build_connection(options)
+ opts = {
+ :path => '%', #in-memory tree
+ :opts => :linear
+ }.merge!(options)
+
+ path = opts.delete(:path)
+
+ opts.reduce(path) { |memo, kvp| memo << "##{kvp[0]}=#{kvp[1]}" }
end
end
end
end