lib/eventus/persistence/kyotocabinet.rb in eventus-0.2.0 vs lib/eventus/persistence/kyotocabinet.rb in eventus-0.3.0

- old
+ new

@@ -4,21 +4,25 @@ module Persistence class KyotoCabinet def initialize(options = {}) @db = ::KyotoCabinet::DB::new - @db.open(options[:path], ::KyotoCabinet::DB::OCREATE) - @serializer = options.fetch(:serializer) { Eventus::Serializers::Marshal } + @queue = ::KyotoCabinet::DB::new + @serializer = options.delete(:serializer) || Eventus::Serializers::Marshal + queue_con = build_connection(:path => options.delete(:queue_path) || '*') + con = build_connection(options) + raise Eventus::ConnectionError unless @db.open(con) && @queue.open(queue_con) end - def commit(id, start, events) - pid = pack_hex(id) + def commit(events) @db.transaction do - events.each_with_index do |event, index| - key = build_key(pid, start + index) + events.each do |event, index| + pid = pack_hex(event['sid']) + key = build_key(pid, event['sequence']) value = @serializer.serialize(event) raise Eventus::ConcurrencyError unless @db.add(key,value) + @queue.set(key, "") end end end def load(id, min = nil) @@ -31,15 +35,49 @@ end @db.get_bulk(keys, false).values.map { |obj| @serializer.deserialize(obj) } end + def load_undispatched + events = [] + @queue.each_key do |key| + value = @db.get(key[0]) + next unless value + obj = @serializer.deserialize(value) + events << obj + end + events + end + + def mark_dispatched(stream_id, sequence) + key = build_key(pack_hex(stream_id), sequence) + @queue.remove(key) + end + def pack_hex(id) id.match(/^[0-9a-fA-F]+$/) ? [id].pack('H*') : id end def build_key(id, index) id + ("_%07d" % index) + end + + def close + @db.close + @queue.close + end + + private + + def build_connection(options) + opts = { + :path => '%', #in-memory tree + :opts => :linear + }.merge!(options) + + path = opts.delete(:path) + + opts.reduce(path) { |memo, kvp| memo << "##{kvp[0]}=#{kvp[1]}" } end end end end