lib/dirty_pipeline/railway.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/railway.rb in dirty_pipeline-0.6.0

- old
+ new

@@ -1,71 +1,81 @@ module DirtyPipeline class Railway - OPERATIONS = %w(call undo finalize) + DEFAULT_OPERATIONS = %w(call undo finalize) def initialize(subject, transaction_id) @tx_id = transaction_id - @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:" \ - ":txid_#{transaction_id}" + @subject_class = subject.class.to_s + @subject_id = subject.id.to_s + @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:" @queues = Hash[ - OPERATIONS.map do |operation| - [operation, Queue.new(operation, subject, transaction_id)] + DEFAULT_OPERATIONS.map do |operation| + [operation, create_queue(operation)] end ] end def clear! @queues.values.each(&:clear!) - DirtyPipeline.with_redis { |r| r.del(active_operation_key) } + DirtyPipeline.with_redis do |r| + r.multi do |mr| + mr.del(active_operation_key) + mr.del(active_transaction_key) + end + end end def next return if other_transaction_in_progress? - start_transaction! if running_transaction.nil? + start_transaction! unless running_transaction queue.pop.tap { |event| finish_transaction! if event.nil? } end - def queue(name = active) - @queues[name.to_s] + def queue(operation_name = active) + @queues.fetch(operation_name.to_s) do + @queues.store(operation_name, create_queue(operation_name)) + end end alias :[] :queue def switch_to(name) - raise ArgumentError unless OPERATIONS.include?(name.to_s) + raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s) return if name.to_s == active DirtyPipeline.with_redis { |r| r.set(active_operation_key, name) } end def active DirtyPipeline.with_redis { |r| r.get(active_operation_key) } end alias :operation :active + def running_transaction + DirtyPipeline.with_redis { |r| r.get(active_transaction_key) } + end + private + def create_queue(operation_name) + Queue.new(operation_name, @subject_class, @subject_id, @tx_id) + end + def active_transaction_key "#{@root}:active_transaction" end def active_operation_key "#{@root}:active_operation" end def start_transaction! - switch_to(OPERATIONS.first) + switch_to(DEFAULT_OPERATIONS.first) unless active DirtyPipeline.with_redis { |r| r.set(active_transaction_key, @tx_id) } end def finish_transaction! - return unless running_transaction == @tx_id - DirtyPipeline.with_redis { |r| r.del(active_transaction_key) } - @queues.values.each(&:clear!) + clear! if running_transaction == @tx_id end - - def running_transaction - DirtyPipeline.with_redis { |r| r.get(active_transaction_key) } - end def other_transaction_in_progress? return false if running_transaction.nil? running_transaction != @tx_id end