lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.6.1
- old
+ new
@@ -33,11 +33,11 @@
end
end
attr_reader :subject, :storage, :status, :uuid, :queue, :railway
def initialize(subject, uuid: nil)
- @uuid = uuid || Nanoid.generate
+ @uuid = uuid || SecureRandom.uuid
@subject = subject
@storage = Storage.new(subject, self.class.pipeline_storage)
@railway = Railway.new(subject, @uuid)
@status = Status.success(subject)
end
@@ -58,40 +58,35 @@
def clear!
storage.reset!
reset!
end
+ # FIXME operation :call - argument
def chain(*args)
railway[:call] << Event.create(*args, tx_id: @uuid)
self
end
def call
- return self if (serialized_event = railway.next).nil?
- execute(load_event(serialized_event), tx_method: :call)
+ return self if (enqueued_event = railway.next).nil?
+ execute(load_event(enqueued_event))
end
alias :call_next :call
def clean
+ finished = railway.queue.to_a.empty?
+ finished &&= railway.queue.processing_event.nil?
+ return self if finished
railway.switch_to(:undo)
- call_next
- self
+ call
end
def retry
- return unless (event = load_event(railway.queue.processing_event))
- execute(event, tx_id: :retry)
+ return self if (enqueued_event = railway.queue.processing_event).nil?
+ execute(load_event(enqueued_event), attempt_retry: true)
end
- def schedule_cleanup
- schedule("cleanup", cleanup_delay)
- end
-
- def schedule_retry
- schedule("retry", retry_delay)
- end
-
def schedule(operation, delay = nil)
job_args = {
"transaction_id" => @uuid,
"enqueued_pipeline" => self.class.to_s,
"find_subject_args" => find_subject_args,
@@ -103,10 +98,16 @@
else
::DirtyPipeline::Worker.perform_in(delay, job_args)
end
end
+ def cleanup_delay; self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY; end
+ def schedule_cleanup; schedule("cleanup", cleanup_delay); end
+
+ def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end
+ def schedule_retry; schedule("retry", retry_delay); end
+
def when_success
yield(status.data, self) if status.success?
self
end
@@ -115,12 +116,15 @@
self
end
private
- def execute(event, tx_method:)
- transaction(event).public_send(tx_method) do |destination, action, *args|
+
+ def execute(event, attempt_retry: false)
+ attempt_retry ? event.attempt_retry! : event.start!
+
+ Transaction.new(self, event).call do |destination, action, *args|
state_changes = process_action(action, event, *args)
next if status.failure?
Success(event, state_changes, destination)
end
call_next
@@ -138,48 +142,31 @@
throw :success, run_operation(action, event, *args)
end
nil
end
rescue => exception
- @status = Status.failure(exception, tag: :exception)
+ Failure(event, exception, type: :exception)
raise
end
def run_operation(action, event, *args)
- return unless action.respond_to?(operation = railway.operation)
+ return unless action.respond_to?(operation = railway.active)
action.public_send(operation, event, self, *args)
end
def interupt_on_error(event)
- return unless (fail_cause = catch(:fail_operation) { yield; nil })
- Failure(event, fail_cause)
+ return unless (fail_cause = catch(:fail_transition) { yield; nil })
+ Failure(event, fail_cause, type: :error)
end
def find_subject_args
subject.id
end
- def retry_delay
- self.class.retry_delay || DEFAULT_RETRY_DELAY
- end
-
- def cleanup_delay
- self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY
- end
-
- def transaction(event)
- Transaction.new(self, event)
- end
-
- def Failure(event, cause)
+ def Failure(event, cause, type:)
railway.switch_to(:undo)
- if cause.eql?(:abort)
- event.abort!
- @status = Status.failure(subject, tag: :aborted)
- else
- event.failure!
- @status = Status.failure(cause, tag: :error)
- end
+ event.failure!
+ @status = Status.failure(cause, tag: type)
throw :abort_transaction, true
end
def Success(event, changes, destination)
event.complete(changes, destination)