lib/redstream/model.rb in redstream-0.1.1 vs lib/redstream/model.rb in redstream-0.2.0
- old
+ new
@@ -10,10 +10,12 @@
#
# redstream_callbacks
# end
module Model
+ IVAR_DELAY_MESSAGE_ID = :@__redstream_delay_message_id__
+
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
@@ -27,14 +29,32 @@
#
# @param producer [Redstream::Producer] A Redstream::Producer that is
# responsible for writing to a redis stream
def redstream_callbacks(producer: Producer.new)
- after_save { |object| producer.delay(object) if object.saved_changes.present? }
- after_touch { |object| producer.delay(object) }
- after_destroy { |object| producer.delay(object) }
- after_commit(on: [:create, :update]) { |object| producer.queue(object) if object.saved_changes.present? }
- after_commit(on: :destroy) { |object| producer.queue(object) }
+ after_save { |object| instance_variable_set(IVAR_DELAY_MESSAGE_ID, producer.delay(object)) if object.saved_changes.present? }
+ after_touch { |object| instance_variable_set(IVAR_DELAY_MESSAGE_ID, producer.delay(object)) }
+ after_destroy { |object| instance_variable_set(IVAR_DELAY_MESSAGE_ID, producer.delay(object)) }
+
+ after_commit(on: [:create, :update]) do |object|
+ if object.saved_changes.present?
+ producer.queue(object)
+
+ if id = instance_variable_get(IVAR_DELAY_MESSAGE_ID)
+ producer.delete(object, id)
+ remove_instance_variable(IVAR_DELAY_MESSAGE_ID)
+ end
+ end
+ end
+
+ after_commit(on: :destroy) do |object|
+ producer.queue(object)
+
+ if id = instance_variable_get(IVAR_DELAY_MESSAGE_ID)
+ producer.delete(object, id)
+ remove_instance_variable(IVAR_DELAY_MESSAGE_ID)
+ end
+ end
end
def redstream_name
name.pluralize.underscore
end