lib/dirty_pipeline/queue.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/queue.rb in dirty_pipeline-0.6.0

- old
+ new

@@ -1,10 +1,9 @@ module DirtyPipeline class Queue - attr_reader :root - def initialize(operation, subject, transaction_id) - @root = "dirty-pipeline-queue:#{subject.class}:#{subject.id}:" \ + def initialize(operation, subject_class, subject_id, transaction_id) + @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \ "op_#{operation}:txid_#{transaction_id}" end def clear! DirtyPipeline.with_redis do |r| @@ -12,41 +11,38 @@ r.del events_queue_key end end def to_a - DirtyPipeline.with_redis { |r| r.lrange(events_queue_key, 0, -1) } + DirtyPipeline.with_redis do |r| + r.lrange(events_queue_key, 0, -1).map! do |packed_event| + unpack(packed_event) + end + end end def push(event) DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) } + self end alias :<< :push def unshift(event) DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) } + self end - def dequeue + def pop DirtyPipeline.with_redis do |r| data = r.lpop(events_queue_key) data.nil? ? r.del(active_event_key) : r.set(active_event_key, data) - return unpack(data) + unpack(data) end end - alias :pop :dequeue - def event_in_progress?(event = nil) - if event.nil? - !processing_event.nil? - else - processing_event.id == event.id - end - end - def processing_event - DirtyPipeline.with_redis { |r| unpack r.get(active_event_key) } + DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) } end private def pack(event) @@ -70,13 +66,13 @@ } ) end def events_queue_key - "#{root}:events" + "#{@root}:events" end def active_event_key - "#{root}:active" + "#{@root}:active" end end end