lib/dirty_pipeline/base.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.0

- old
+ new

@@ -20,11 +20,12 @@ attr_accessor :pipeline_storage, :retry_delay, :cleanup_delay using StringCamelcase def transition(name, from:, to:, action: nil, attempts: 1) - action ||= const_get(name.to_s.camelcase) + action ||= const_get(name.to_s.camelcase(:upper)) rescue nil + action ||= method(name) if respond_to?(name) @transitions_map[name.to_s] = { action: action, from: Array(from).map(&:to_s), to: to.to_s, attempts: attempts, @@ -53,22 +54,22 @@ def reset! railway.clear! end def clear! - storage.clear! + storage.reset! reset! end def chain(*args) railway[:call] << Event.create(*args, tx_id: @uuid) self end def call return self if (serialized_event = railway.next).nil? - execute(load_event(serialized_event)) + execute(load_event(serialized_event), tx_method: :call) end alias :call_next :call def clean railway.switch_to(:undo) @@ -76,11 +77,11 @@ self end def retry return unless (event = load_event(railway.queue.processing_event)) - execute(event, :retry) + execute(event, tx_id: :retry) end def schedule_cleanup schedule("cleanup", cleanup_delay) end @@ -114,12 +115,12 @@ self end private - def execute(event, type = :call) - transaction(event).public_send(type) do |destination, action, *args| + def execute(event, tx_method:) + transaction(event).public_send(tx_method) do |destination, action, *args| state_changes = process_action(action, event, *args) next if status.failure? Success(event, state_changes, destination) end call_next @@ -132,13 +133,11 @@ end def process_action(action, event, *args) return catch(:success) do return if interupt_on_error(event) do - return if interupt_on_abort(event) do - throw :success, run_operation(action, event, *args); nil - end + throw :success, run_operation(action, event, *args) end nil end rescue => exception @status = Status.failure(exception, tag: :exception) @@ -149,18 +148,14 @@ return unless action.respond_to?(operation = railway.operation) action.public_send(operation, event, self, *args) end def interupt_on_error(event) - return if (fail_cause = catch(:fail_with_error) { yield; nil }).nil? + return unless (fail_cause = catch(:fail_operation) { yield; nil }) Failure(event, fail_cause) end - def interupt_on_abort(event) - Abort(event) if catch(:abort) { yield; nil } - end - def find_subject_args subject.id end def retry_delay @@ -170,23 +165,21 @@ def cleanup_delay self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY end def transaction(event) - ::DirtyPipeline::Transaction.new(self, railway.queue, event) + Transaction.new(self, event) end def Failure(event, cause) - event.failure! railway.switch_to(:undo) - @status = Status.failure(cause, tag: :error) - throw :abort_transaction, true - end - - def Abort(event) - event.failure! - railway.switch_to(:undo) - @status = Status.failure(subject, tag: :aborted) + if cause.eql?(:abort) + event.abort! + @status = Status.failure(subject, tag: :aborted) + else + event.failure! + @status = Status.failure(cause, tag: :error) + end throw :abort_transaction, true end def Success(event, changes, destination) event.complete(changes, destination)