Sha256: 48ad7cbabde57a525fad45c7b6fa92294aa805c658001851262fa9e1ac3a0d37
Contents?: true
Size: 1.92 KB
Versions: 3
Compression:
Stored size: 1.92 KB
Contents
module DirtyPipeline module Redis class Queue def initialize(operation, subject_class, subject_id, transaction_id) @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \ "op_#{operation}:txid_#{transaction_id}" end def clear! DirtyPipeline.with_redis do |r| r.del active_event_key r.del events_queue_key end end def to_a DirtyPipeline.with_redis do |r| r.lrange(events_queue_key, 0, -1).map! do |packed_event| unpack(packed_event) end end end def push(event) DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) } self end alias :<< :push def unshift(event) DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) } self end def pop DirtyPipeline.with_redis do |r| data = r.lpop(events_queue_key) data.nil? ? r.del(active_event_key) : r.set(active_event_key, data) unpack(data) end end def processing_event DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) } end private def pack(event) JSON.dump( "evid" => event.id, "txid" => event.tx_id, "transit" => event.transition, "args" => event.args, ) end def unpack(packed_event) return unless packed_event unpacked_event = JSON.load(packed_event) Event.new( data: { "uuid" => unpacked_event["evid"], "transaction_uuid" => unpacked_event["txid"], "transition" => unpacked_event["transit"], "args" => unpacked_event["args"], } ) end def events_queue_key "#{@root}:events" end def active_event_key "#{@root}:active" end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
dirty_pipeline-0.8.3 | lib/dirty_pipeline/redis/queue.rb |
dirty_pipeline-0.8.2 | lib/dirty_pipeline/redis/queue.rb |
dirty_pipeline-0.8.1 | lib/dirty_pipeline/redis/queue.rb |