Sha256: 08c0b0baf1b0a2217900e366533ba045e5e304e79364ce6a683391ca8063a0d7

Contents?: true

Size: 1.27 KB

Versions: 4

Compression:

Stored size: 1.27 KB

Contents

# frozen_string_literal: true

module PgEventstore
  # @!visibility private
  class TransactionQueries
    attr_reader :connection
    private :connection

    # @param connection [PgEventstore::Connection]
    def initialize(connection)
      @connection = connection
    end

    # @return [void]
    def transaction
      connection.with do |conn|
        # We are inside a transaction already - no need to start another one
        if [PG::PQTRANS_ACTIVE, PG::PQTRANS_INTRANS].include?(conn.transaction_status)
          next yield
        end

        pg_transaction(conn) do
          yield
        end
      end
    end

    private

    # @param pg_connection [PG::Connection]
    # @return [void]
    def pg_transaction(pg_connection)
      pg_connection.transaction do
        pg_connection.exec("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
        yield
      end
    rescue PG::TRSerializationFailure, PG::TRDeadlockDetected
      retry
    rescue MissingPartitions => error
      error.event_types.each do |event_type|
        transaction do
          partition_queries.create_partitions(error.stream, event_type)
        end
      rescue PG::UniqueViolation
        retry
      end
      retry
    end

    def partition_queries
      PartitionQueries.new(connection)
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
pg_eventstore-1.0.0.rc2 lib/pg_eventstore/queries/transaction_queries.rb
pg_eventstore-1.0.0.rc1 lib/pg_eventstore/queries/transaction_queries.rb
pg_eventstore-0.10.2 lib/pg_eventstore/queries/transaction_queries.rb
pg_eventstore-0.10.1 lib/pg_eventstore/queries/transaction_queries.rb