lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.1

- old
+ new

@@ -33,11 +33,11 @@ end end attr_reader :subject, :storage, :status, :uuid, :queue, :railway def initialize(subject, uuid: nil) - @uuid = uuid || Nanoid.generate + @uuid = uuid || SecureRandom.uuid @subject = subject @storage = Storage.new(subject, self.class.pipeline_storage) @railway = Railway.new(subject, @uuid) @status = Status.success(subject) end @@ -58,40 +58,35 @@ def clear! storage.reset! reset! end + # FIXME operation :call - argument def chain(*args) railway[:call] << Event.create(*args, tx_id: @uuid) self end def call - return self if (serialized_event = railway.next).nil? - execute(load_event(serialized_event), tx_method: :call) + return self if (enqueued_event = railway.next).nil? + execute(load_event(enqueued_event)) end alias :call_next :call def clean + finished = railway.queue.to_a.empty? + finished &&= railway.queue.processing_event.nil? + return self if finished railway.switch_to(:undo) - call_next - self + call end def retry - return unless (event = load_event(railway.queue.processing_event)) - execute(event, tx_id: :retry) + return self if (enqueued_event = railway.queue.processing_event).nil? + execute(load_event(enqueued_event), attempt_retry: true) end - def schedule_cleanup - schedule("cleanup", cleanup_delay) - end - - def schedule_retry - schedule("retry", retry_delay) - end - def schedule(operation, delay = nil) job_args = { "transaction_id" => @uuid, "enqueued_pipeline" => self.class.to_s, "find_subject_args" => find_subject_args, @@ -103,10 +98,16 @@ else ::DirtyPipeline::Worker.perform_in(delay, job_args) end end + def cleanup_delay; self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY; end + def schedule_cleanup; schedule("cleanup", cleanup_delay); end + + def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end + def schedule_retry; schedule("retry", retry_delay); end + def when_success yield(status.data, self) if status.success? self end @@ -115,12 +116,15 @@ self end private - def execute(event, tx_method:) - transaction(event).public_send(tx_method) do |destination, action, *args| + + def execute(event, attempt_retry: false) + attempt_retry ? event.attempt_retry! : event.start! + + Transaction.new(self, event).call do |destination, action, *args| state_changes = process_action(action, event, *args) next if status.failure? Success(event, state_changes, destination) end call_next @@ -138,48 +142,31 @@ throw :success, run_operation(action, event, *args) end nil end rescue => exception - @status = Status.failure(exception, tag: :exception) + Failure(event, exception, type: :exception) raise end def run_operation(action, event, *args) - return unless action.respond_to?(operation = railway.operation) + return unless action.respond_to?(operation = railway.active) action.public_send(operation, event, self, *args) end def interupt_on_error(event) - return unless (fail_cause = catch(:fail_operation) { yield; nil }) - Failure(event, fail_cause) + return unless (fail_cause = catch(:fail_transition) { yield; nil }) + Failure(event, fail_cause, type: :error) end def find_subject_args subject.id end - def retry_delay - self.class.retry_delay || DEFAULT_RETRY_DELAY - end - - def cleanup_delay - self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY - end - - def transaction(event) - Transaction.new(self, event) - end - - def Failure(event, cause) + def Failure(event, cause, type:) railway.switch_to(:undo) - if cause.eql?(:abort) - event.abort! - @status = Status.failure(subject, tag: :aborted) - else - event.failure! - @status = Status.failure(cause, tag: :error) - end + event.failure! + @status = Status.failure(cause, tag: type) throw :abort_transaction, true end def Success(event, changes, destination) event.complete(changes, destination)