Sha256: c179b9e23016618f5e85be322c7513493ee6f1f0dcd90dd46c08163f7ee2f4ae
Contents?: true
Size: 1.93 KB
Versions: 1
Compression:
Stored size: 1.93 KB
Contents
# frozen_string_literal: true module PubSubModelSync class Transaction < Base PUBLISHER_KLASS = PubSubModelSync::MessagePublisher attr_accessor :key, :payloads, :max_buffer, :root, :children, :finished # @param key (String|nil) Transaction key, if empty will use the ordering_key from first payload # @param max_buffer (Integer) Once this quantity of notifications is reached, then all notifications # will immediately be delivered. # Note: There is no way to rollback delivered notifications if current transaction fails def initialize(key, max_buffer: config.transactions_max_buffer) @key = key @max_buffer = max_buffer @children = [] @payloads = [] end # @param payload (Payload) def add_payload(payload) payloads << payload deliver_payloads if payloads.count >= max_buffer end def finish # rubocop:disable Metrics/AbcSize if root root.children = root.children.reject { |t| t == self } root.deliver_all if root.finished && root.children.empty? end self.finished = true deliver_all if children.empty? end def add_transaction(transaction) transaction.root = self children << transaction transaction end def rollback log("rollback #{payloads.count} notifications", :warn) if children.any? && debug? self.children = [] root&.rollback clean_publisher end def clean_publisher PUBLISHER_KLASS.current_transaction = nil if !root && children.empty? end def deliver_all deliver_payloads clean_publisher end private def deliver_payloads payloads.each do |payload| begin # rubocop:disable Style/RedundantBegin (ruby 2.4 support) PUBLISHER_KLASS.connector_publish(payload) rescue => e PUBLISHER_KLASS.send(:notify_error, e, payload) end end self.payloads = [] end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
pub_sub_model_sync-1.0.beta2 | lib/pub_sub_model_sync/transaction.rb |