lib/redstream/model.rb in redstream-0.1.1 vs lib/redstream/model.rb in redstream-0.2.0

- old
+ new

@@ -10,10 +10,12 @@ # # redstream_callbacks # end module Model + IVAR_DELAY_MESSAGE_ID = :@__redstream_delay_message_id__ + def self.included(base) base.extend(ClassMethods) end module ClassMethods @@ -27,14 +29,32 @@ # # @param producer [Redstream::Producer] A Redstream::Producer that is # responsible for writing to a redis stream def redstream_callbacks(producer: Producer.new) - after_save { |object| producer.delay(object) if object.saved_changes.present? } - after_touch { |object| producer.delay(object) } - after_destroy { |object| producer.delay(object) } - after_commit(on: [:create, :update]) { |object| producer.queue(object) if object.saved_changes.present? } - after_commit(on: :destroy) { |object| producer.queue(object) } + after_save { |object| instance_variable_set(IVAR_DELAY_MESSAGE_ID, producer.delay(object)) if object.saved_changes.present? } + after_touch { |object| instance_variable_set(IVAR_DELAY_MESSAGE_ID, producer.delay(object)) } + after_destroy { |object| instance_variable_set(IVAR_DELAY_MESSAGE_ID, producer.delay(object)) } + + after_commit(on: [:create, :update]) do |object| + if object.saved_changes.present? + producer.queue(object) + + if id = instance_variable_get(IVAR_DELAY_MESSAGE_ID) + producer.delete(object, id) + remove_instance_variable(IVAR_DELAY_MESSAGE_ID) + end + end + end + + after_commit(on: :destroy) do |object| + producer.queue(object) + + if id = instance_variable_get(IVAR_DELAY_MESSAGE_ID) + producer.delete(object, id) + remove_instance_variable(IVAR_DELAY_MESSAGE_ID) + end + end end def redstream_name name.pluralize.underscore end