lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.4 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.7.0

- old
+ new

@@ -20,12 +20,12 @@ attr_accessor :pipeline_storage, :retry_delay, :cleanup_delay using StringCamelcase def transition(name, from:, to:, action: nil, attempts: 1) - action ||= const_get(name.to_s.camelcase(:upper)) rescue nil action ||= method(name) if respond_to?(name) + action ||= const_get(name.to_s.camelcase(:upper)) @transitions_map[name.to_s] = { action: action, from: Array(from).map(&:to_s), to: to.to_s, attempts: attempts, @@ -59,16 +59,17 @@ storage.reset! reset! end # FIXME operation :call - argument - def chain(*args) - railway[:call] << Event.create(*args, tx_id: @uuid) + def chain(*args, operation: :call) + railway[operation] << Event.create(*args, tx_id: @uuid) self end def call + # HANDLE ANOTHER ACTION IN PROGRESS EXPLICITLY return self if (enqueued_event = railway.next).nil? execute(load_event(enqueued_event)) end alias :call_next :call @@ -104,10 +105,15 @@ def schedule_cleanup; schedule("cleanup", cleanup_delay); end def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end def schedule_retry; schedule("retry", retry_delay); end + def when_skipped + yield(nil, self) if railway.other_transaction_in_progress? + self + end + def when_success yield(status.data, self) if status.success? self end @@ -116,14 +122,14 @@ self end private - def execute(event, attempt_retry: false) attempt_retry ? event.attempt_retry! : event.start! + # dispatch event? Transaction.new(self, event).call do |destination, action, *args| state_changes = process_action(action, event, *args) next if status.failure? Success(event, state_changes, destination) end @@ -135,11 +141,11 @@ def load_event(enqueued_event) storage.find_event(enqueued_event.id) || enqueued_event end def process_action(action, event, *args) - return catch(:success) do + catch(:success) do return if interupt_on_error(event) do throw :success, run_operation(action, event, *args) end nil end @@ -147,9 +153,10 @@ Failure(event, exception, type: :exception) raise end def run_operation(action, event, *args) + raise ArgumentError unless action return unless action.respond_to?(operation = railway.active) action.public_send(operation, event, self, *args) end def interupt_on_error(event)