lib/dirty_pipeline/base.rb in dirty_pipeline-0.4.0 vs lib/dirty_pipeline/base.rb in dirty_pipeline-0.5.0

- old
+ new

@@ -1,163 +1,165 @@ module DirtyPipeline class Base DEFAULT_RETRY_DELAY = 5 * 60 # 5 minutes DEFAULT_CLEANUP_DELAY = 60 * 60 * 24 # 1 day - RESERVED_STATUSES = [ - Storage::FAILED_STATUS, - Storage::PROCESSING_STATUS, - Storage::RETRY_STATUS, - Locker::CLEAN, - ] - class ReservedStatusError < StandardError; end class InvalidTransition < StandardError; end class << self def find_subject(*args) fail NotImplemented end attr_reader :transitions_map def inherited(child) - child.instance_variable_set(:@transitions_map, Hash.new) + child.instance_variable_set( + :@transitions_map, + transitions_map || Hash.new + ) end - # PG JSONB column - # { - # status: :errored, - # state: { - # field: "value", - # }, - # errors: [ - # { - # error: "RuPost::API::Error", - # error_message: "Timeout error", - # created_at: 2018-01-01T13:22Z - # }, - # ], - # events: [ - # { - # action: Init, - # input: ..., - # created_at: ..., - # updated_at: ..., - # attempts_count: 2, - # }, - # {...}, - # ] - # } attr_accessor :pipeline_storage, :retry_delay, :cleanup_delay - def transition(action, from:, to:, name: action.to_s, attempts: 1) - raise ReservedStatusError unless valid_statuses?(from, to) - @transitions_map[name] = { + using StringCamelcase + + def transition(name, from:, to:, action: nil, attempts: 1) + action ||= const_get(name.to_s.camelcase) + @transitions_map[name.to_s] = { action: action, from: Array(from).map(&:to_s), to: to.to_s, attempts: attempts, } end - - private - - def valid_statuses?(from, to) - ((Array(to) + Array(from)) & RESERVED_STATUSES).empty? - end end - attr_reader :subject, :error, :storage, :status, :transitions_chain - def initialize(subject) + attr_reader :subject, :storage, :status, :uuid, :queue, :railway + def initialize(subject, uuid: nil) + @uuid = uuid || Nanoid.generate @subject = subject @storage = Storage.new(subject, self.class.pipeline_storage) - @status = Status.new(self) - @transitions_chain = [] + @railway = Railway.new(subject, @uuid) + @status = Status.success(subject) end - def enqueue(transition_name, *args) - DirtyPipeline::Worker.perform_async( - "enqueued_pipeline" => self.class.to_s, - "find_subject_args" => find_subject_args, - "transition_args" => args.unshift(transition_name), - ) + def find_transition(name) + self.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs| + next unless railway.operation.eql?(:call) + next if from == Array(storage.status) + next if from.include?(storage.status.to_s) + raise InvalidTransition, "from `#{storage.status}` by `#{name}`" + end end def reset! - storage.reset_pipeline_status! + railway.clear! end def clear! storage.clear! + reset! end - def cache - storage.last_event["cache"] + def chain(*args) + railway[:call] << Event.create(*args, tx_id: @uuid) + self end - def chain(*args) - transitions_chain << args + def call + return self if (serialized_event = railway.next).nil? + execute(load_event(serialized_event)) + end + alias :call_next :call + + def clean + railway.switch_to(:undo) + call_next self end - def execute - Result() do - transitions_chain.each do |targs| - call(*targs) - storage.increment_transaction_depth! - end - storage.reset_transaction_depth! - transitions_chain.clear + def retry + return unless (event = load_event(railway.queue.processing_event)) + execute(event, :retry) + end + + def schedule_cleanup + schedule("cleanup", cleanup_delay) + end + + def schedule_retry + schedule("retry", retry_delay) + end + + def schedule(operation, delay = nil) + job_args = { + "transaction_id" => @uuid, + "enqueued_pipeline" => self.class.to_s, + "find_subject_args" => find_subject_args, + "operation" => operation, + } + + if delay.nil? + ::DirtyPipeline::Worker.perform_async(job_args) + else + ::DirtyPipeline::Worker.perform_in(delay, job_args) end end - def call(*args) - storage.reset_transaction_depth! if transitions_chain.empty? - Result() do - after_commit = nil - # transaction with support of external calls - transaction(*args) do |destination, action, *targs| - output = {} - fail_cause = nil + def when_success + yield(status.data, self) if status.success? + self + end - output, *after_commit = catch(:success) do - fail_cause = catch(:fail_with_error) do - Abort() if catch(:abort) do - throw :success, action.(self, *targs) - end - end - nil - end + def when_failure(tag = status.tag) + yield(status.data, self) if status.failure? && status.tag == tag + self + end - if fail_cause - Failure(fail_cause) - else - Success(destination, output) + private + + def execute(event, type = :call) + transaction(event).public_send(type) do |destination, action, *args| + state_changes = process_action(action, event, *args) + next if status.failure? + Success(event, state_changes, destination) + end + call_next + + self + end + + def load_event(enqueued_event) + storage.find_event(enqueued_event.id) || enqueued_event + end + + def process_action(action, event, *args) + return catch(:success) do + return if interupt_on_error(event) do + return if interupt_on_abort(event) do + throw :success, run_operation(action, event, *args); nil end end - - Array(after_commit).each { |cb| cb.call(subject) } if after_commit + nil end + rescue => exception + @status = Status.failure(exception, tag: :exception) + raise end - def schedule_retry - ::DirtyPipeline::Worker.perform_in( - retry_delay, - "enqueued_pipeline" => self.class.to_s, - "find_subject_args" => find_subject_args, - "retry" => true, - ) + def run_operation(action, event, *args) + return unless action.respond_to?(operation = railway.operation) + action.public_send(operation, event, self, *args) end - def schedule_cleanup - ::DirtyPipeline::Worker.perform_in( - cleanup_delay, - "enqueued_pipeline" => self.class.to_s, - "find_subject_args" => find_subject_args, - "transition_args" => [Locker::CLEAN], - ) + def interupt_on_error(event) + return if (fail_cause = catch(:fail_with_error) { yield; nil }).nil? + Failure(event, fail_cause) end - private + def interupt_on_abort(event) + Abort(event) if catch(:abort) { yield; nil } + end def find_subject_args subject.id end @@ -167,33 +169,29 @@ def cleanup_delay self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY end - def transaction(*args) - ::DirtyPipeline::Transaction.new(self).call(*args) do |*targs| - yield(*targs) - end + def transaction(event) + ::DirtyPipeline::Transaction.new(self, railway.queue, event) end - def Result() - status.wrap { yield } + def Failure(event, cause) + event.failure! + railway.switch_to(:undo) + @status = Status.failure(cause, tag: :error) + throw :abort_transaction, true end - def Failure(cause) - storage.fail_event! - status.error = cause - status.succeeded = false - end - - def Abort() - status.succeeded = false + def Abort(event) + event.failure! + railway.switch_to(:undo) + @status = Status.failure(subject, tag: :aborted) throw :abort_transaction, true end - def Success(destination, output) - cache.clear - storage.complete!(output, destination) - status.succeeded = true + def Success(event, changes, destination) + event.complete(changes, destination) + @status = Status.success(subject) end end end