lib/dirty_pipeline/base.rb in dirty_pipeline-0.3.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.4.0
- old
+ new
@@ -62,12 +62,20 @@
def valid_statuses?(from, to)
((Array(to) + Array(from)) & RESERVED_STATUSES).empty?
end
end
+ attr_reader :subject, :error, :storage, :status, :transitions_chain
+ def initialize(subject)
+ @subject = subject
+ @storage = Storage.new(subject, self.class.pipeline_storage)
+ @status = Status.new(self)
+ @transitions_chain = []
+ end
+
def enqueue(transition_name, *args)
- Shipping::PipelineWorker.perform_async(
+ DirtyPipeline::Worker.perform_async(
"enqueued_pipeline" => self.class.to_s,
"find_subject_args" => find_subject_args,
"transition_args" => args.unshift(transition_name),
)
end
@@ -79,22 +87,31 @@
def clear!
storage.clear!
end
def cache
- storage.store["cache"]
+ storage.last_event["cache"]
end
- attr_reader :subject, :error, :storage, :status
- def initialize(subject)
- @subject = subject
- @storage = Storage.new(subject, self.class.pipeline_storage)
- @locker = Locker.new(@subject, @storage)
- @status = Status.new(self)
+ def chain(*args)
+ transitions_chain << args
+ self
end
+ def execute
+ Result() do
+ transitions_chain.each do |targs|
+ call(*targs)
+ storage.increment_transaction_depth!
+ end
+ storage.reset_transaction_depth!
+ transitions_chain.clear
+ end
+ end
+
def call(*args)
+ storage.reset_transaction_depth! if transitions_chain.empty?
Result() do
after_commit = nil
# transaction with support of external calls
transaction(*args) do |destination, action, *targs|
output = {}
@@ -108,24 +125,40 @@
end
nil
end
if fail_cause
- ExpectedError(fail_cause)
+ Failure(fail_cause)
else
Success(destination, output)
end
end
Array(after_commit).each { |cb| cb.call(subject) } if after_commit
end
end
- private
+ def schedule_retry
+ ::DirtyPipeline::Worker.perform_in(
+ retry_delay,
+ "enqueued_pipeline" => self.class.to_s,
+ "find_subject_args" => find_subject_args,
+ "retry" => true,
+ )
+ end
- attr_reader :locker
+ def schedule_cleanup
+ ::DirtyPipeline::Worker.perform_in(
+ cleanup_delay,
+ "enqueued_pipeline" => self.class.to_s,
+ "find_subject_args" => find_subject_args,
+ "transition_args" => [Locker::CLEAN],
+ )
+ end
+ private
+
def find_subject_args
subject.id
end
def retry_delay
@@ -134,99 +167,33 @@
def cleanup_delay
self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY
end
+ def transaction(*args)
+ ::DirtyPipeline::Transaction.new(self).call(*args) do |*targs|
+ yield(*targs)
+ end
+ end
+
def Result()
status.wrap { yield }
end
- def Retry(error, *args)
- storage.save_retry!(error)
- Shipping::PipelineWorker.perform_in(
- retry_delay,
- "enqueued_pipeline" => self.class.to_s,
- "find_subject_args" => find_subject_args,
- "retry" => true,
- )
- end
-
- def ExpectedError(cause)
- status.error = cause
+ def Failure(cause)
storage.fail_event!
+ status.error = cause
status.succeeded = false
end
- def Exception(error)
- storage.save_exception!(error)
- status.error = error
- status.succeeded = false
- end
-
def Abort()
status.succeeded = false
throw :abort_transaction, true
end
def Success(destination, output)
cache.clear
storage.complete!(output, destination)
status.succeeded = true
- end
-
- def try_again?(max_attempts_count)
- return unless max_attempts_count
- storage.last_event["attempts_count"].to_i < max_attempts_count
- end
-
- def find_transition(name)
- if (const_name = self.class.const_get(name) rescue nil)
- name = const_name.to_s
- end
- self.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs|
- next if from == Array(storage.status)
- next if from.include?(storage.status.to_s)
- raise InvalidTransition, "from `#{storage.status}` by `#{name}`"
- end
- end
-
- def schedule_cleanup
- Shipping::PipelineWorker.perform_in(
- cleanup_delay,
- "enqueued_pipeline" => self.class.to_s,
- "find_subject_args" => find_subject_args,
- "transition_args" => [Locker::CLEAN],
- )
- end
-
- def transaction(*args)
- locker.with_lock(*args) do |transition, *transition_args|
- begin
- schedule_cleanup
- destination, action, max_attempts_count =
- find_transition(transition).values_at(:to, :action, :attempts)
-
- status.action_pool.unshift(action)
- subject.transaction(requires_new: true) do
- raise ActiveRecord::Rollback if catch(:abort_transaction) do
- yield(destination, action, *transition_args); nil
- end
- end
- rescue => error
- if try_again?(max_attempts_count)
- Retry(error)
- else
- Exception(error)
- end
- raise
- ensure
- if status.succeeded == false
- status.action_pool.each do |reversable_action|
- next unless reversable_action.respond_to?(:undo)
- reversable_action.undo(self, *transition_args)
- end
- end
- end
- end
end
end
end