Sha256: 3bf3239c42f6a7c56c7e8c8bdf85d5344c7a134900a3d079d32dd0f92490be27

Contents?: true

Size: 1.4 KB

Versions: 1

Compression:

Stored size: 1.4 KB

Contents

module Eventus
  module Persistence
    class Sequel
      MIGRATIONS_PATH = File.expand_path('../sequel/migrations', __FILE__).freeze
      TABLE_NAME = :eventus_events

      attr_reader :dataset, :db

      def initialize(db)
        @db = db
        @dataset = db[TABLE_NAME]
        @dataset.row_proc = method :convert_row
      end

      def commit(events)
        @db.transaction(:savepoint => true) do
          events.each do |event|
            event['body'] = db.typecast_value(:json, event['body'])
          end
          @dataset.multi_insert(events)
        end
        events
      rescue ::Sequel::UniqueConstraintViolation
        raise Eventus::ConcurrencyError
      end

      def load(sid, min=nil)
        events = @dataset.where(:sid => sid)
        events = events.where{ sequence >= min } if min
        events.all
      end

      def load_undispatched
        @dataset.where(:dispatched => false).all
      end

      def mark_dispatched(sid, sequence)
        @dataset
          .where(:sid => sid, :sequence => sequence)
          .update(:dispatched => true)
      end

      def convert_row(row)
        row.each_with_object({}) do |(k,v), memo|
          memo[k.to_s] = v
        end
      end

      def self.migrate!(db, target = nil)
        ::Sequel.extension :migration
        ::Sequel::Migrator.run(db, MIGRATIONS_PATH, :target => target, :table => :eventus_schema)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
eventus-0.6.3 lib/eventus/persistence/sequel.rb