Sha256: 798ae67ad7f9087afe24b70e34c0c1a3a6d56bd4c05a78808280f96d4a3711cb
Contents?: true
Size: 1.79 KB
Versions: 1
Compression:
Stored size: 1.79 KB
Contents
module Eventus module Persistence class Sequel MIGRATIONS_PATH = File.expand_path('../sequel/migrations', __FILE__).freeze TABLE_NAME = :eventus_events attr_reader :dataset, :db def initialize(db) @db = db @dataset = db[TABLE_NAME] @dataset.row_proc = method :convert_row end def commit(events) @db.transaction(:savepoint => true) do events.each do |event| event['body'] = db.typecast_value(:json, event['body'] || {}) end @dataset.multi_insert(events) end events rescue ::Sequel::UniqueConstraintViolation raise Eventus::ConcurrencyError end def load(sid, min = nil, max = nil) _load(sid, nil, min, max) end def load_undispatched(sid = nil) _load(sid, false) end def _load(sid = nil, dispatched = nil, min = nil, max = nil) events = @dataset events = events.where(:sid => sid) unless sid.nil? events = events.where(:dispatched => dispatched) unless dispatched.nil? events = events.where{ sequence >= min } unless min.nil? events = events.where{ sequence <= max } unless max.nil? events.order_by(:sequence).all end def mark_dispatched(sid, sequence) @dataset .where(:sid => sid, :sequence => sequence) .update(:dispatched => true) end def convert_row(row) res = row.each_with_object({}) do |(k,v), memo| memo[k.to_s] = v end res['body'] = res.fetch('body', {}).to_hash res end def self.migrate!(db, target = nil) ::Sequel.extension :migration ::Sequel::Migrator.run(db, MIGRATIONS_PATH, :target => target, :table => :eventus_schema) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
eventus-0.6.7 | lib/eventus/persistence/sequel.rb |