Sha256: 36a64db9c30e10a5e3cd4e50a1f1cb1e6897987fd8d2eeeed9c8d4cc0e2ec05c
Contents?: true
Size: 1.86 KB
Versions: 1
Compression:
Stored size: 1.86 KB
Contents
module DirtyPipeline class Queue attr_reader :root def initialize(operation, subject, transaction_id) @root = "dirty-pipeline-queue:#{subject.class}:#{subject.id}:" \ "op_#{operation}:txid_#{transaction_id}" end def clear! DirtyPipeline.with_redis do |r| r.del active_event_key r.del events_queue_key end end def to_a DirtyPipeline.with_redis { |r| r.lrange(events_queue_key, 0, -1) } end def push(event) DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) } end alias :<< :push def unshift(event) DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) } end def dequeue DirtyPipeline.with_redis do |r| data = r.lpop(events_queue_key) data.nil? ? r.del(active_event_key) : r.set(active_event_key, data) return unpack(data) end end alias :pop :dequeue def event_in_progress?(event = nil) if event.nil? !processing_event.nil? else processing_event.id == event.id end end def processing_event DirtyPipeline.with_redis { |r| unpack r.get(active_event_key) } end private def pack(event) JSON.dump( "evid" => event.id, "txid" => event.tx_id, "transit" => event.transition, "args" => event.args, ) end def unpack(packed_event) return unless packed_event unpacked_event = JSON.load(packed_event) Event.new( data: { "uuid" => unpacked_event["evid"], "transaction_uuid" => unpacked_event["txid"], "transition" => unpacked_event["transit"], "args" => unpacked_event["args"], } ) end def events_queue_key "#{root}:events" end def active_event_key "#{root}:active" end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dirty_pipeline-0.5.0 | lib/dirty_pipeline/queue.rb |