lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.4 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.7.0
- old
+ new
@@ -20,12 +20,12 @@
attr_accessor :pipeline_storage, :retry_delay, :cleanup_delay
using StringCamelcase
def transition(name, from:, to:, action: nil, attempts: 1)
- action ||= const_get(name.to_s.camelcase(:upper)) rescue nil
action ||= method(name) if respond_to?(name)
+ action ||= const_get(name.to_s.camelcase(:upper))
@transitions_map[name.to_s] = {
action: action,
from: Array(from).map(&:to_s),
to: to.to_s,
attempts: attempts,
@@ -59,16 +59,17 @@
storage.reset!
reset!
end
# FIXME operation :call - argument
- def chain(*args)
- railway[:call] << Event.create(*args, tx_id: @uuid)
+ def chain(*args, operation: :call)
+ railway[operation] << Event.create(*args, tx_id: @uuid)
self
end
def call
+ # HANDLE ANOTHER ACTION IN PROGRESS EXPLICITLY
return self if (enqueued_event = railway.next).nil?
execute(load_event(enqueued_event))
end
alias :call_next :call
@@ -104,10 +105,15 @@
def schedule_cleanup; schedule("cleanup", cleanup_delay); end
def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end
def schedule_retry; schedule("retry", retry_delay); end
+ def when_skipped
+ yield(nil, self) if railway.other_transaction_in_progress?
+ self
+ end
+
def when_success
yield(status.data, self) if status.success?
self
end
@@ -116,14 +122,14 @@
self
end
private
-
def execute(event, attempt_retry: false)
attempt_retry ? event.attempt_retry! : event.start!
+ # dispatch event?
Transaction.new(self, event).call do |destination, action, *args|
state_changes = process_action(action, event, *args)
next if status.failure?
Success(event, state_changes, destination)
end
@@ -135,11 +141,11 @@
def load_event(enqueued_event)
storage.find_event(enqueued_event.id) || enqueued_event
end
def process_action(action, event, *args)
- return catch(:success) do
+ catch(:success) do
return if interupt_on_error(event) do
throw :success, run_operation(action, event, *args)
end
nil
end
@@ -147,9 +153,10 @@
Failure(event, exception, type: :exception)
raise
end
def run_operation(action, event, *args)
+ raise ArgumentError unless action
return unless action.respond_to?(operation = railway.active)
action.public_send(operation, event, self, *args)
end
def interupt_on_error(event)