# frozen_string_literal: true module Deimos module ActiveRecordConsume # Methods for consuming individual messages and saving them to the database # as ActiveRecord instances. module MessageConsumption # Find the record specified by the given payload and key. # Default is to use the primary key column and the value of the first # field in the key. # @param klass [Class] # @param _payload [Hash,Deimos::SchemaClass::Record] # @param key [Object] # @return [ActiveRecord::Base] def fetch_record(klass, _payload, key) fetch_key = key.is_a?(Hash) && key.size == 1 ? key.values.first : key klass.unscoped.where(klass.primary_key => fetch_key).first end # Assign a key to a new record. # @param record [ActiveRecord::Base] # @param _payload [Hash,Deimos::SchemaClass::Record] # @param key [Object] # @return [void] def assign_key(record, _payload, key) record[record.class.primary_key] = key end # @param payload [Hash,Deimos::SchemaClass::Record] Decoded payloads # @param metadata [Hash] Information about batch, including keys. # @return [void] def consume(payload, metadata) unless self.process_message?(payload) Deimos.config.logger.debug( message: 'Skipping processing of message', payload: payload, metadata: metadata ) return end key = metadata.with_indifferent_access[:key] klass = self.class.config[:record_class] record = fetch_record(klass, (payload || {}).with_indifferent_access, key) if payload.nil? destroy_record(record) return end if record.blank? record = klass.new assign_key(record, payload, key) end # for backwards compatibility # TODO next major release we should deprecate this attrs = if self.method(:record_attributes).parameters.size == 2 record_attributes(payload.with_indifferent_access, key) else record_attributes(payload.with_indifferent_access) end # don't use attributes= - bypass Rails < 5 attr_protected attrs.each do |k, v| record.send("#{k}=", v) end save_record(record) end # @param record [ActiveRecord::Base] # @return [void] def save_record(record) record.created_at ||= Time.zone.now if record.respond_to?(:created_at) record.updated_at = Time.zone.now if record.respond_to?(:updated_at) record.save! end # Destroy a record that received a null payload. Override if you need # to do something other than a straight destroy (e.g. mark as archived). # @param record [ActiveRecord::Base] # @return [void] def destroy_record(record) record&.destroy end end end end