lib/dirty_pipeline/base.rb in dirty_pipeline-0.4.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.5.0
- old
+ new
@@ -1,163 +1,165 @@
module DirtyPipeline
class Base
DEFAULT_RETRY_DELAY = 5 * 60 # 5 minutes
DEFAULT_CLEANUP_DELAY = 60 * 60 * 24 # 1 day
- RESERVED_STATUSES = [
- Storage::FAILED_STATUS,
- Storage::PROCESSING_STATUS,
- Storage::RETRY_STATUS,
- Locker::CLEAN,
- ]
- class ReservedStatusError < StandardError; end
class InvalidTransition < StandardError; end
class << self
def find_subject(*args)
fail NotImplemented
end
attr_reader :transitions_map
def inherited(child)
- child.instance_variable_set(:@transitions_map, Hash.new)
+ child.instance_variable_set(
+ :@transitions_map,
+ transitions_map || Hash.new
+ )
end
- # PG JSONB column
- # {
- # status: :errored,
- # state: {
- # field: "value",
- # },
- # errors: [
- # {
- # error: "RuPost::API::Error",
- # error_message: "Timeout error",
- # created_at: 2018-01-01T13:22Z
- # },
- # ],
- # events: [
- # {
- # action: Init,
- # input: ...,
- # created_at: ...,
- # updated_at: ...,
- # attempts_count: 2,
- # },
- # {...},
- # ]
- # }
attr_accessor :pipeline_storage, :retry_delay, :cleanup_delay
- def transition(action, from:, to:, name: action.to_s, attempts: 1)
- raise ReservedStatusError unless valid_statuses?(from, to)
- @transitions_map[name] = {
+ using StringCamelcase
+
+ def transition(name, from:, to:, action: nil, attempts: 1)
+ action ||= const_get(name.to_s.camelcase)
+ @transitions_map[name.to_s] = {
action: action,
from: Array(from).map(&:to_s),
to: to.to_s,
attempts: attempts,
}
end
-
- private
-
- def valid_statuses?(from, to)
- ((Array(to) + Array(from)) & RESERVED_STATUSES).empty?
- end
end
- attr_reader :subject, :error, :storage, :status, :transitions_chain
- def initialize(subject)
+ attr_reader :subject, :storage, :status, :uuid, :queue, :railway
+ def initialize(subject, uuid: nil)
+ @uuid = uuid || Nanoid.generate
@subject = subject
@storage = Storage.new(subject, self.class.pipeline_storage)
- @status = Status.new(self)
- @transitions_chain = []
+ @railway = Railway.new(subject, @uuid)
+ @status = Status.success(subject)
end
- def enqueue(transition_name, *args)
- DirtyPipeline::Worker.perform_async(
- "enqueued_pipeline" => self.class.to_s,
- "find_subject_args" => find_subject_args,
- "transition_args" => args.unshift(transition_name),
- )
+ def find_transition(name)
+ self.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs|
+ next unless railway.operation.eql?(:call)
+ next if from == Array(storage.status)
+ next if from.include?(storage.status.to_s)
+ raise InvalidTransition, "from `#{storage.status}` by `#{name}`"
+ end
end
def reset!
- storage.reset_pipeline_status!
+ railway.clear!
end
def clear!
storage.clear!
+ reset!
end
- def cache
- storage.last_event["cache"]
+ def chain(*args)
+ railway[:call] << Event.create(*args, tx_id: @uuid)
+ self
end
- def chain(*args)
- transitions_chain << args
+ def call
+ return self if (serialized_event = railway.next).nil?
+ execute(load_event(serialized_event))
+ end
+ alias :call_next :call
+
+ def clean
+ railway.switch_to(:undo)
+ call_next
self
end
- def execute
- Result() do
- transitions_chain.each do |targs|
- call(*targs)
- storage.increment_transaction_depth!
- end
- storage.reset_transaction_depth!
- transitions_chain.clear
+ def retry
+ return unless (event = load_event(railway.queue.processing_event))
+ execute(event, :retry)
+ end
+
+ def schedule_cleanup
+ schedule("cleanup", cleanup_delay)
+ end
+
+ def schedule_retry
+ schedule("retry", retry_delay)
+ end
+
+ def schedule(operation, delay = nil)
+ job_args = {
+ "transaction_id" => @uuid,
+ "enqueued_pipeline" => self.class.to_s,
+ "find_subject_args" => find_subject_args,
+ "operation" => operation,
+ }
+
+ if delay.nil?
+ ::DirtyPipeline::Worker.perform_async(job_args)
+ else
+ ::DirtyPipeline::Worker.perform_in(delay, job_args)
end
end
- def call(*args)
- storage.reset_transaction_depth! if transitions_chain.empty?
- Result() do
- after_commit = nil
- # transaction with support of external calls
- transaction(*args) do |destination, action, *targs|
- output = {}
- fail_cause = nil
+ def when_success
+ yield(status.data, self) if status.success?
+ self
+ end
- output, *after_commit = catch(:success) do
- fail_cause = catch(:fail_with_error) do
- Abort() if catch(:abort) do
- throw :success, action.(self, *targs)
- end
- end
- nil
- end
+ def when_failure(tag = status.tag)
+ yield(status.data, self) if status.failure? && status.tag == tag
+ self
+ end
- if fail_cause
- Failure(fail_cause)
- else
- Success(destination, output)
+ private
+
+ def execute(event, type = :call)
+ transaction(event).public_send(type) do |destination, action, *args|
+ state_changes = process_action(action, event, *args)
+ next if status.failure?
+ Success(event, state_changes, destination)
+ end
+ call_next
+
+ self
+ end
+
+ def load_event(enqueued_event)
+ storage.find_event(enqueued_event.id) || enqueued_event
+ end
+
+ def process_action(action, event, *args)
+ return catch(:success) do
+ return if interupt_on_error(event) do
+ return if interupt_on_abort(event) do
+ throw :success, run_operation(action, event, *args); nil
end
end
-
- Array(after_commit).each { |cb| cb.call(subject) } if after_commit
+ nil
end
+ rescue => exception
+ @status = Status.failure(exception, tag: :exception)
+ raise
end
- def schedule_retry
- ::DirtyPipeline::Worker.perform_in(
- retry_delay,
- "enqueued_pipeline" => self.class.to_s,
- "find_subject_args" => find_subject_args,
- "retry" => true,
- )
+ def run_operation(action, event, *args)
+ return unless action.respond_to?(operation = railway.operation)
+ action.public_send(operation, event, self, *args)
end
- def schedule_cleanup
- ::DirtyPipeline::Worker.perform_in(
- cleanup_delay,
- "enqueued_pipeline" => self.class.to_s,
- "find_subject_args" => find_subject_args,
- "transition_args" => [Locker::CLEAN],
- )
+ def interupt_on_error(event)
+ return if (fail_cause = catch(:fail_with_error) { yield; nil }).nil?
+ Failure(event, fail_cause)
end
- private
+ def interupt_on_abort(event)
+ Abort(event) if catch(:abort) { yield; nil }
+ end
def find_subject_args
subject.id
end
@@ -167,33 +169,29 @@
def cleanup_delay
self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY
end
- def transaction(*args)
- ::DirtyPipeline::Transaction.new(self).call(*args) do |*targs|
- yield(*targs)
- end
+ def transaction(event)
+ ::DirtyPipeline::Transaction.new(self, railway.queue, event)
end
- def Result()
- status.wrap { yield }
+ def Failure(event, cause)
+ event.failure!
+ railway.switch_to(:undo)
+ @status = Status.failure(cause, tag: :error)
+ throw :abort_transaction, true
end
- def Failure(cause)
- storage.fail_event!
- status.error = cause
- status.succeeded = false
- end
-
- def Abort()
- status.succeeded = false
+ def Abort(event)
+ event.failure!
+ railway.switch_to(:undo)
+ @status = Status.failure(subject, tag: :aborted)
throw :abort_transaction, true
end
- def Success(destination, output)
- cache.clear
- storage.complete!(output, destination)
- status.succeeded = true
+ def Success(event, changes, destination)
+ event.complete(changes, destination)
+ @status = Status.success(subject)
end
end
end