Sha256: 77c4ceb8919aa2f6ade0536a17353dca3ef73b926985d910fdc8b21c4e0c90b8

Contents?: true

Size: 1.83 KB

Versions: 11

Compression:

Stored size: 1.83 KB

Contents

# frozen_string_literal: true

module Kafka
  class TransactionStateMachine
    class InvalidTransitionError < StandardError; end
    class InvalidStateError < StandardError; end

    STATES = [
      UNINITIALIZED          = :uninitialized,
      READY                  = :ready,
      IN_TRANSACTION         = :in_trasaction,
      COMMITTING_TRANSACTION = :committing_transaction,
      ABORTING_TRANSACTION   = :aborting_transaction,
      ERROR                  = :error
    ]

    TRANSITIONS = {
      UNINITIALIZED          => [READY, ERROR],
      READY                  => [UNINITIALIZED, COMMITTING_TRANSACTION, ABORTING_TRANSACTION],
      IN_TRANSACTION         => [READY],
      COMMITTING_TRANSACTION => [IN_TRANSACTION],
      ABORTING_TRANSACTION   => [IN_TRANSACTION],
      # Any states can transition to error state
      ERROR                  => STATES
    }

    def initialize(logger:)
      @state = UNINITIALIZED
      @mutex = Mutex.new
      @logger = logger
    end

    def transition_to!(next_state)
      raise InvalidStateError unless STATES.include?(next_state)
      unless TRANSITIONS[next_state].include?(@state)
        raise InvalidTransitionError, "Could not transition from state '#{@state}' to state '#{next_state}'"
      end
      @logger.debug("Transaction state changed to '#{next_state}'!")
      @mutex.synchronize { @state = next_state }
    end

    def uninitialized?
      in_state?(UNINITIALIZED)
    end

    def ready?
      in_state?(READY)
    end

    def in_transaction?
      in_state?(IN_TRANSACTION)
    end

    def committing_transaction?
      in_state?(COMMITTING_TRANSACTION)
    end

    def aborting_transaction?
      in_state?(ABORTING_TRANSACTION)
    end

    def error?
      in_state?(ERROR)
    end

    private

    def in_state?(state)
      @mutex.synchronize { @state == state }
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
ruby-kafka-0.7.5 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.5.beta1 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.4 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.3 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.2 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.1.debugcorruption2 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.1.messagecorruptiondebug lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.1 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.1.beta2 lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.1.possible.pre.fetch.pre.fix lib/kafka/transaction_state_machine.rb
ruby-kafka-0.7.1.beta1 lib/kafka/transaction_state_machine.rb