Sha256: f005e0c702af49aab5d561cd8855ce5322218355002680f078d845ed92ee7dea

Contents?: true

Size: 1.84 KB

Versions: 1

Compression:

Stored size: 1.84 KB

Contents

module DirtyPipeline
  class Railway
    OPERATIONS = %w(call undo finalize)

    def initialize(subject, transaction_id)
      @tx_id = transaction_id
      @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:" \
              ":txid_#{transaction_id}"
      @queues = Hash[
        OPERATIONS.map do |operation|
          [operation, Queue.new(operation, subject, transaction_id)]
        end
      ]
    end

    def clear!
      @queues.values.each(&:clear!)
      DirtyPipeline.with_redis { |r| r.del(active_operation_key) }
    end

    def next
      return if other_transaction_in_progress?
      start_transaction! if running_transaction.nil?

      queue.pop.tap { |event| finish_transaction! if event.nil? }
    end

    def queue(name = active)
      @queues[name.to_s]
    end
    alias :[] :queue

    def switch_to(name)
      raise ArgumentError unless OPERATIONS.include?(name.to_s)
      return if name.to_s == active
      DirtyPipeline.with_redis { |r| r.set(active_operation_key, name) }
    end

    def active
      DirtyPipeline.with_redis { |r| r.get(active_operation_key) }
    end
    alias :operation :active

    private

    def active_transaction_key
      "#{@root}:active_transaction"
    end

    def active_operation_key
      "#{@root}:active_operation"
    end

    def start_transaction!
      switch_to(OPERATIONS.first)
      DirtyPipeline.with_redis { |r| r.set(active_transaction_key, @tx_id) }
    end

    def finish_transaction!
      return unless running_transaction == @tx_id
      DirtyPipeline.with_redis { |r| r.del(active_transaction_key) }
      @queues.values.each(&:clear!)
    end

   def running_transaction
     DirtyPipeline.with_redis { |r| r.get(active_transaction_key) }
   end

    def other_transaction_in_progress?
      return false if running_transaction.nil?
      running_transaction != @tx_id
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dirty_pipeline-0.5.0 lib/dirty_pipeline/railway.rb