lib/dirty_pipeline/base.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.0
- old
+ new
@@ -20,11 +20,12 @@
attr_accessor :pipeline_storage, :retry_delay, :cleanup_delay
using StringCamelcase
def transition(name, from:, to:, action: nil, attempts: 1)
- action ||= const_get(name.to_s.camelcase)
+ action ||= const_get(name.to_s.camelcase(:upper)) rescue nil
+ action ||= method(name) if respond_to?(name)
@transitions_map[name.to_s] = {
action: action,
from: Array(from).map(&:to_s),
to: to.to_s,
attempts: attempts,
@@ -53,22 +54,22 @@
def reset!
railway.clear!
end
def clear!
- storage.clear!
+ storage.reset!
reset!
end
def chain(*args)
railway[:call] << Event.create(*args, tx_id: @uuid)
self
end
def call
return self if (serialized_event = railway.next).nil?
- execute(load_event(serialized_event))
+ execute(load_event(serialized_event), tx_method: :call)
end
alias :call_next :call
def clean
railway.switch_to(:undo)
@@ -76,11 +77,11 @@
self
end
def retry
return unless (event = load_event(railway.queue.processing_event))
- execute(event, :retry)
+ execute(event, tx_id: :retry)
end
def schedule_cleanup
schedule("cleanup", cleanup_delay)
end
@@ -114,12 +115,12 @@
self
end
private
- def execute(event, type = :call)
- transaction(event).public_send(type) do |destination, action, *args|
+ def execute(event, tx_method:)
+ transaction(event).public_send(tx_method) do |destination, action, *args|
state_changes = process_action(action, event, *args)
next if status.failure?
Success(event, state_changes, destination)
end
call_next
@@ -132,13 +133,11 @@
end
def process_action(action, event, *args)
return catch(:success) do
return if interupt_on_error(event) do
- return if interupt_on_abort(event) do
- throw :success, run_operation(action, event, *args); nil
- end
+ throw :success, run_operation(action, event, *args)
end
nil
end
rescue => exception
@status = Status.failure(exception, tag: :exception)
@@ -149,18 +148,14 @@
return unless action.respond_to?(operation = railway.operation)
action.public_send(operation, event, self, *args)
end
def interupt_on_error(event)
- return if (fail_cause = catch(:fail_with_error) { yield; nil }).nil?
+ return unless (fail_cause = catch(:fail_operation) { yield; nil })
Failure(event, fail_cause)
end
- def interupt_on_abort(event)
- Abort(event) if catch(:abort) { yield; nil }
- end
-
def find_subject_args
subject.id
end
def retry_delay
@@ -170,23 +165,21 @@
def cleanup_delay
self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY
end
def transaction(event)
- ::DirtyPipeline::Transaction.new(self, railway.queue, event)
+ Transaction.new(self, event)
end
def Failure(event, cause)
- event.failure!
railway.switch_to(:undo)
- @status = Status.failure(cause, tag: :error)
- throw :abort_transaction, true
- end
-
- def Abort(event)
- event.failure!
- railway.switch_to(:undo)
- @status = Status.failure(subject, tag: :aborted)
+ if cause.eql?(:abort)
+ event.abort!
+ @status = Status.failure(subject, tag: :aborted)
+ else
+ event.failure!
+ @status = Status.failure(cause, tag: :error)
+ end
throw :abort_transaction, true
end
def Success(event, changes, destination)
event.complete(changes, destination)