lib/dirty_pipeline/transaction.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/transaction.rb in dirty_pipeline-0.6.0
- old
+ new
@@ -1,56 +1,52 @@
module DirtyPipeline
class Transaction
- attr_reader :locker, :storage, :subject, :pipeline, :queue, :event
- def initialize(pipeline, queue, event)
+ attr_reader :locker, :storage, :subject, :pipeline, :event
+ def initialize(pipeline, event)
@pipeline = pipeline
@subject = pipeline.subject
@storage = pipeline.storage
- @queue = queue
@event = event
end
- def retry
- event.attempt_retry!
- pipeline.schedule_cleanup
-
+ def call
+ event.start!
with_transaction { |*targs| yield(*targs) }
end
- def call
- # return unless queue.event_in_progress?(event)
-
- event.start!
- pipeline.schedule_cleanup
-
+ def retry
+ event.attempt_retry!
with_transaction { |*targs| yield(*targs) }
end
private
def with_transaction
+ pipeline.schedule_cleanup
+
destination, action, max_attempts_count =
pipeline.find_transition(event.transition)
.values_at(:to, :action, :attempts)
storage.commit!(event)
- # status.action_pool.unshift(action)
subject.transaction(requires_new: true) do
- raise ActiveRecord::Rollback if catch(:abort_transaction) do
- yield(destination, action, *event.args); nil
- end
+ with_abort_handling { yield(destination, action, *event.args) }
end
rescue => exception
event.link_exception(exception)
if max_attempts_count.to_i > event.attempts_count
event.retry!
pipeline.schedule_retry
- else
- pipeline.schedule_cleanup
end
raise
ensure
storage.commit!(event)
+ end
+
+ def with_abort_handling
+ return unless catch(:abort_transaction) { yield; nil }
+ event.abort! unless event.abort?
+ raise ActiveRecord::Rollback
end
end
end