lib/dirty_pipeline/base.rb in dirty_pipeline-0.2.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.3.0

- old
+ new

@@ -74,109 +74,57 @@ def reset! storage.reset_pipeline_status! end - attr_reader :subject, :error, :storage + def clear! + storage.clear! + end + + def cache + storage.store["cache"] + end + + attr_reader :subject, :error, :storage, :status def initialize(subject) @subject = subject @storage = Storage.new(subject, self.class.pipeline_storage) @locker = Locker.new(@subject, @storage) + @status = Status.new(self) end def call(*args) - return self if succeeded == false - self.succeeded = nil - after_commit = nil + Result() do + after_commit = nil + # transaction with support of external calls + transaction(*args) do |destination, action, *targs| + output = {} + fail_cause = nil - # transaction with support of external calls - transaction(*args) do |destination, action, *transition_args| - output = {} - fail_cause = nil - - output, *after_commit = catch(:success) do - fail_cause = catch(:fail_with_error) do - return Abort() if catch(:abort) do - throw :success, action.(subject, *transition_args) + output, *after_commit = catch(:success) do + fail_cause = catch(:fail_with_error) do + Abort() if catch(:abort) do + throw :success, action.(self, *targs) + end end + nil end - nil - end - if fail_cause - ExpectedError(fail_cause) - else - Success(destination, output) + if fail_cause + ExpectedError(fail_cause) + else + Success(destination, output) + end end - end - Array(after_commit).each { |cb| cb.call(subject) } if after_commit - self - end - - def clear! - storage.clear! - end - - def success? - succeeded - end - - def when_success(callback = nil) - return self unless success? - if block_given? - yield(self) - else - callback.call(self) + Array(after_commit).each { |cb| cb.call(subject) } if after_commit end - self end - def when_failed(callback = nil) - return self unless storage.failed? - if block_given? - yield(self) - else - callback.call(self) - end - self - end - - def errored? - return if succeeded.nil? - ready? && !succeeded - end - - def when_error(callback = nil) - return self unless errored? - if block_given? - yield(self) - else - callback.call(self) - end - self - end - - def ready? - storage.pipeline_status.nil? - end - - def when_processing(callback = nil) - return self unless storage.processing? - if block_given? - yield(self) - else - callback.call(self) - end - self - end - private - attr_writer :error attr_reader :locker - attr_accessor :succeeded, :previous_status def find_subject_args subject.id end @@ -186,10 +134,14 @@ def cleanup_delay self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY end + def Result() + status.wrap { yield } + end + def Retry(error, *args) storage.save_retry!(error) Shipping::PipelineWorker.perform_in( retry_delay, "enqueued_pipeline" => self.class.to_s, @@ -197,29 +149,30 @@ "retry" => true, ) end def ExpectedError(cause) - self.error = cause + status.error = cause storage.fail_event! - self.succeeded = false + status.succeeded = false end - def Error(error) + def Exception(error) storage.save_exception!(error) - self.error = error - self.succeeded = false + status.error = error + status.succeeded = false end def Abort() - self.succeeded = false + status.succeeded = false throw :abort_transaction, true end def Success(destination, output) + cache.clear storage.complete!(output, destination) - self.succeeded = true + status.succeeded = true end def try_again?(max_attempts_count) return unless max_attempts_count storage.last_event["attempts_count"].to_i < max_attempts_count @@ -250,22 +203,29 @@ begin schedule_cleanup destination, action, max_attempts_count = find_transition(transition).values_at(:to, :action, :attempts) + status.action_pool.unshift(action) subject.transaction(requires_new: true) do raise ActiveRecord::Rollback if catch(:abort_transaction) do yield(destination, action, *transition_args); nil end end rescue => error if try_again?(max_attempts_count) Retry(error) else - # FIXME: Somehow :error is a Hash, all the time - Error(error) + Exception(error) end raise + ensure + if status.succeeded == false + status.action_pool.each do |reversable_action| + next unless reversable_action.respond_to?(:undo) + reversable_action.undo(self, *transition_args) + end + end end end end end end