lib/dirty_pipeline/queue.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/queue.rb in dirty_pipeline-0.6.0
- old
+ new
@@ -1,10 +1,9 @@
module DirtyPipeline
class Queue
- attr_reader :root
- def initialize(operation, subject, transaction_id)
- @root = "dirty-pipeline-queue:#{subject.class}:#{subject.id}:" \
+ def initialize(operation, subject_class, subject_id, transaction_id)
+ @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \
"op_#{operation}:txid_#{transaction_id}"
end
def clear!
DirtyPipeline.with_redis do |r|
@@ -12,41 +11,38 @@
r.del events_queue_key
end
end
def to_a
- DirtyPipeline.with_redis { |r| r.lrange(events_queue_key, 0, -1) }
+ DirtyPipeline.with_redis do |r|
+ r.lrange(events_queue_key, 0, -1).map! do |packed_event|
+ unpack(packed_event)
+ end
+ end
end
def push(event)
DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) }
+ self
end
alias :<< :push
def unshift(event)
DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) }
+ self
end
- def dequeue
+ def pop
DirtyPipeline.with_redis do |r|
data = r.lpop(events_queue_key)
data.nil? ? r.del(active_event_key) : r.set(active_event_key, data)
- return unpack(data)
+ unpack(data)
end
end
- alias :pop :dequeue
- def event_in_progress?(event = nil)
- if event.nil?
- !processing_event.nil?
- else
- processing_event.id == event.id
- end
- end
-
def processing_event
- DirtyPipeline.with_redis { |r| unpack r.get(active_event_key) }
+ DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) }
end
private
def pack(event)
@@ -70,13 +66,13 @@
}
)
end
def events_queue_key
- "#{root}:events"
+ "#{@root}:events"
end
def active_event_key
- "#{root}:active"
+ "#{@root}:active"
end
end
end