lib/dirty_pipeline/base.rb in dirty_pipeline-0.3.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.4.0

- old
+ new

@@ -62,12 +62,20 @@ def valid_statuses?(from, to) ((Array(to) + Array(from)) & RESERVED_STATUSES).empty? end end + attr_reader :subject, :error, :storage, :status, :transitions_chain + def initialize(subject) + @subject = subject + @storage = Storage.new(subject, self.class.pipeline_storage) + @status = Status.new(self) + @transitions_chain = [] + end + def enqueue(transition_name, *args) - Shipping::PipelineWorker.perform_async( + DirtyPipeline::Worker.perform_async( "enqueued_pipeline" => self.class.to_s, "find_subject_args" => find_subject_args, "transition_args" => args.unshift(transition_name), ) end @@ -79,22 +87,31 @@ def clear! storage.clear! end def cache - storage.store["cache"] + storage.last_event["cache"] end - attr_reader :subject, :error, :storage, :status - def initialize(subject) - @subject = subject - @storage = Storage.new(subject, self.class.pipeline_storage) - @locker = Locker.new(@subject, @storage) - @status = Status.new(self) + def chain(*args) + transitions_chain << args + self end + def execute + Result() do + transitions_chain.each do |targs| + call(*targs) + storage.increment_transaction_depth! + end + storage.reset_transaction_depth! + transitions_chain.clear + end + end + def call(*args) + storage.reset_transaction_depth! if transitions_chain.empty? Result() do after_commit = nil # transaction with support of external calls transaction(*args) do |destination, action, *targs| output = {} @@ -108,24 +125,40 @@ end nil end if fail_cause - ExpectedError(fail_cause) + Failure(fail_cause) else Success(destination, output) end end Array(after_commit).each { |cb| cb.call(subject) } if after_commit end end - private + def schedule_retry + ::DirtyPipeline::Worker.perform_in( + retry_delay, + "enqueued_pipeline" => self.class.to_s, + "find_subject_args" => find_subject_args, + "retry" => true, + ) + end - attr_reader :locker + def schedule_cleanup + ::DirtyPipeline::Worker.perform_in( + cleanup_delay, + "enqueued_pipeline" => self.class.to_s, + "find_subject_args" => find_subject_args, + "transition_args" => [Locker::CLEAN], + ) + end + private + def find_subject_args subject.id end def retry_delay @@ -134,99 +167,33 @@ def cleanup_delay self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY end + def transaction(*args) + ::DirtyPipeline::Transaction.new(self).call(*args) do |*targs| + yield(*targs) + end + end + def Result() status.wrap { yield } end - def Retry(error, *args) - storage.save_retry!(error) - Shipping::PipelineWorker.perform_in( - retry_delay, - "enqueued_pipeline" => self.class.to_s, - "find_subject_args" => find_subject_args, - "retry" => true, - ) - end - - def ExpectedError(cause) - status.error = cause + def Failure(cause) storage.fail_event! + status.error = cause status.succeeded = false end - def Exception(error) - storage.save_exception!(error) - status.error = error - status.succeeded = false - end - def Abort() status.succeeded = false throw :abort_transaction, true end def Success(destination, output) cache.clear storage.complete!(output, destination) status.succeeded = true - end - - def try_again?(max_attempts_count) - return unless max_attempts_count - storage.last_event["attempts_count"].to_i < max_attempts_count - end - - def find_transition(name) - if (const_name = self.class.const_get(name) rescue nil) - name = const_name.to_s - end - self.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs| - next if from == Array(storage.status) - next if from.include?(storage.status.to_s) - raise InvalidTransition, "from `#{storage.status}` by `#{name}`" - end - end - - def schedule_cleanup - Shipping::PipelineWorker.perform_in( - cleanup_delay, - "enqueued_pipeline" => self.class.to_s, - "find_subject_args" => find_subject_args, - "transition_args" => [Locker::CLEAN], - ) - end - - def transaction(*args) - locker.with_lock(*args) do |transition, *transition_args| - begin - schedule_cleanup - destination, action, max_attempts_count = - find_transition(transition).values_at(:to, :action, :attempts) - - status.action_pool.unshift(action) - subject.transaction(requires_new: true) do - raise ActiveRecord::Rollback if catch(:abort_transaction) do - yield(destination, action, *transition_args); nil - end - end - rescue => error - if try_again?(max_attempts_count) - Retry(error) - else - Exception(error) - end - raise - ensure - if status.succeeded == false - status.action_pool.each do |reversable_action| - next unless reversable_action.respond_to?(:undo) - reversable_action.undo(self, *transition_args) - end - end - end - end end end end