lib/dirty_pipeline/base.rb in dirty_pipeline-0.2.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.3.0
- old
+ new
@@ -74,109 +74,57 @@
def reset!
storage.reset_pipeline_status!
end
- attr_reader :subject, :error, :storage
+ def clear!
+ storage.clear!
+ end
+
+ def cache
+ storage.store["cache"]
+ end
+
+ attr_reader :subject, :error, :storage, :status
def initialize(subject)
@subject = subject
@storage = Storage.new(subject, self.class.pipeline_storage)
@locker = Locker.new(@subject, @storage)
+ @status = Status.new(self)
end
def call(*args)
- return self if succeeded == false
- self.succeeded = nil
- after_commit = nil
+ Result() do
+ after_commit = nil
+ # transaction with support of external calls
+ transaction(*args) do |destination, action, *targs|
+ output = {}
+ fail_cause = nil
- # transaction with support of external calls
- transaction(*args) do |destination, action, *transition_args|
- output = {}
- fail_cause = nil
-
- output, *after_commit = catch(:success) do
- fail_cause = catch(:fail_with_error) do
- return Abort() if catch(:abort) do
- throw :success, action.(subject, *transition_args)
+ output, *after_commit = catch(:success) do
+ fail_cause = catch(:fail_with_error) do
+ Abort() if catch(:abort) do
+ throw :success, action.(self, *targs)
+ end
end
+ nil
end
- nil
- end
- if fail_cause
- ExpectedError(fail_cause)
- else
- Success(destination, output)
+ if fail_cause
+ ExpectedError(fail_cause)
+ else
+ Success(destination, output)
+ end
end
- end
- Array(after_commit).each { |cb| cb.call(subject) } if after_commit
- self
- end
-
- def clear!
- storage.clear!
- end
-
- def success?
- succeeded
- end
-
- def when_success(callback = nil)
- return self unless success?
- if block_given?
- yield(self)
- else
- callback.call(self)
+ Array(after_commit).each { |cb| cb.call(subject) } if after_commit
end
- self
end
- def when_failed(callback = nil)
- return self unless storage.failed?
- if block_given?
- yield(self)
- else
- callback.call(self)
- end
- self
- end
-
- def errored?
- return if succeeded.nil?
- ready? && !succeeded
- end
-
- def when_error(callback = nil)
- return self unless errored?
- if block_given?
- yield(self)
- else
- callback.call(self)
- end
- self
- end
-
- def ready?
- storage.pipeline_status.nil?
- end
-
- def when_processing(callback = nil)
- return self unless storage.processing?
- if block_given?
- yield(self)
- else
- callback.call(self)
- end
- self
- end
-
private
- attr_writer :error
attr_reader :locker
- attr_accessor :succeeded, :previous_status
def find_subject_args
subject.id
end
@@ -186,10 +134,14 @@
def cleanup_delay
self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY
end
+ def Result()
+ status.wrap { yield }
+ end
+
def Retry(error, *args)
storage.save_retry!(error)
Shipping::PipelineWorker.perform_in(
retry_delay,
"enqueued_pipeline" => self.class.to_s,
@@ -197,29 +149,30 @@
"retry" => true,
)
end
def ExpectedError(cause)
- self.error = cause
+ status.error = cause
storage.fail_event!
- self.succeeded = false
+ status.succeeded = false
end
- def Error(error)
+ def Exception(error)
storage.save_exception!(error)
- self.error = error
- self.succeeded = false
+ status.error = error
+ status.succeeded = false
end
def Abort()
- self.succeeded = false
+ status.succeeded = false
throw :abort_transaction, true
end
def Success(destination, output)
+ cache.clear
storage.complete!(output, destination)
- self.succeeded = true
+ status.succeeded = true
end
def try_again?(max_attempts_count)
return unless max_attempts_count
storage.last_event["attempts_count"].to_i < max_attempts_count
@@ -250,22 +203,29 @@
begin
schedule_cleanup
destination, action, max_attempts_count =
find_transition(transition).values_at(:to, :action, :attempts)
+ status.action_pool.unshift(action)
subject.transaction(requires_new: true) do
raise ActiveRecord::Rollback if catch(:abort_transaction) do
yield(destination, action, *transition_args); nil
end
end
rescue => error
if try_again?(max_attempts_count)
Retry(error)
else
- # FIXME: Somehow :error is a Hash, all the time
- Error(error)
+ Exception(error)
end
raise
+ ensure
+ if status.succeeded == false
+ status.action_pool.each do |reversable_action|
+ next unless reversable_action.respond_to?(:undo)
+ reversable_action.undo(self, *transition_args)
+ end
+ end
end
end
end
end
end