lib/dirty_pipeline/transaction.rb in dirty_pipeline-0.4.0 vs lib/dirty_pipeline/transaction.rb in dirty_pipeline-0.5.0
- old
+ new
@@ -1,76 +1,56 @@
module DirtyPipeline
class Transaction
- attr_reader :locker, :storage, :subject, :pipeline
- def initialize(pipeline)
+ attr_reader :locker, :storage, :subject, :pipeline, :queue, :event
+ def initialize(pipeline, queue, event)
@pipeline = pipeline
- @storage = pipeline.storage
@subject = pipeline.subject
- @locker = Locker.new(@subject, @storage)
+ @storage = pipeline.storage
+ @queue = queue
+ @event = event
end
- def call(*args)
- locker.with_lock(*args) do |transition, *transition_args|
- pipeline.schedule_cleanup
- begin
- destination, action, max_attempts_count =
- find_transition(transition).values_at(:to, :action, :attempts)
+ def retry
+ event.attempt_retry!
+ pipeline.schedule_cleanup
- # status.action_pool.unshift(action)
- subject.transaction(requires_new: true) do
- raise ActiveRecord::Rollback if catch(:abort_transaction) do
- yield(destination, action, *transition_args); nil
- end
- end
- rescue => error
- if try_again?(max_attempts_count)
- Retry(error)
- else
- Exception(error)
- end
- raise
- ensure
- unless pipeline.status.success?
- storage.events
- .last(storage.transaction_depth)
- .reverse
- .each do |params|
- transition = params["transition"]
- targs = params["args"]
- reversable_action = find_transition(transition).fetch(:action)
- reversable_action.undo(self, *targs)
- end
- end
- end
- end
+ with_transaction { |*targs| yield(*targs) }
end
- private
+ def call
+ # return unless queue.event_in_progress?(event)
- def Retry(error, *args)
- storage.save_retry!(error)
- pipeline.schedule_retry
- end
+ event.start!
+ pipeline.schedule_cleanup
- def Exception(error)
- storage.save_exception!(error)
- pipeline.status.error = error
- pipeline.status.succeeded = false
+ with_transaction { |*targs| yield(*targs) }
end
- def try_again?(max_attempts_count)
- return false unless max_attempts_count
- storage.last_event["attempts_count"].to_i < max_attempts_count
- end
+ private
- def find_transition(name)
- if (const_name = pipeline.class.const_get(name) rescue nil)
- name = const_name.to_s
+ def with_transaction
+ destination, action, max_attempts_count =
+ pipeline.find_transition(event.transition)
+ .values_at(:to, :action, :attempts)
+
+ storage.commit!(event)
+
+ # status.action_pool.unshift(action)
+ subject.transaction(requires_new: true) do
+ raise ActiveRecord::Rollback if catch(:abort_transaction) do
+ yield(destination, action, *event.args); nil
+ end
end
- pipeline.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs|
- next if from == Array(storage.status)
- next if from.include?(storage.status.to_s)
- raise InvalidTransition, "from `#{storage.status}` by `#{name}`"
+ rescue => exception
+ event.link_exception(exception)
+ if max_attempts_count.to_i > event.attempts_count
+ event.retry!
+ pipeline.schedule_retry
+ else
+ pipeline.schedule_cleanup
end
+ raise
+ ensure
+ storage.commit!(event)
end
end
end