Sha256: 3bf3239c42f6a7c56c7e8c8bdf85d5344c7a134900a3d079d32dd0f92490be27
Contents?: true
Size: 1.4 KB
Versions: 1
Compression:
Stored size: 1.4 KB
Contents
module Eventus module Persistence class Sequel MIGRATIONS_PATH = File.expand_path('../sequel/migrations', __FILE__).freeze TABLE_NAME = :eventus_events attr_reader :dataset, :db def initialize(db) @db = db @dataset = db[TABLE_NAME] @dataset.row_proc = method :convert_row end def commit(events) @db.transaction(:savepoint => true) do events.each do |event| event['body'] = db.typecast_value(:json, event['body']) end @dataset.multi_insert(events) end events rescue ::Sequel::UniqueConstraintViolation raise Eventus::ConcurrencyError end def load(sid, min=nil) events = @dataset.where(:sid => sid) events = events.where{ sequence >= min } if min events.all end def load_undispatched @dataset.where(:dispatched => false).all end def mark_dispatched(sid, sequence) @dataset .where(:sid => sid, :sequence => sequence) .update(:dispatched => true) end def convert_row(row) row.each_with_object({}) do |(k,v), memo| memo[k.to_s] = v end end def self.migrate!(db, target = nil) ::Sequel.extension :migration ::Sequel::Migrator.run(db, MIGRATIONS_PATH, :target => target, :table => :eventus_schema) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
eventus-0.6.3 | lib/eventus/persistence/sequel.rb |