Sha256: 36a64db9c30e10a5e3cd4e50a1f1cb1e6897987fd8d2eeeed9c8d4cc0e2ec05c

Contents?: true

Size: 1.86 KB

Versions: 1

Compression:

Stored size: 1.86 KB

Contents

module DirtyPipeline
  class Queue
    attr_reader :root
    def initialize(operation, subject, transaction_id)
      @root = "dirty-pipeline-queue:#{subject.class}:#{subject.id}:" \
              "op_#{operation}:txid_#{transaction_id}"
    end

    def clear!
      DirtyPipeline.with_redis do |r|
        r.del active_event_key
        r.del events_queue_key
      end
    end

    def to_a
      DirtyPipeline.with_redis { |r| r.lrange(events_queue_key, 0, -1) }
    end

    def push(event)
      DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) }
    end
    alias :<< :push

    def unshift(event)
      DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) }
    end

    def dequeue
      DirtyPipeline.with_redis do |r|
        data = r.lpop(events_queue_key)
        data.nil? ? r.del(active_event_key) : r.set(active_event_key, data)
        return unpack(data)
      end
    end
    alias :pop :dequeue

    def event_in_progress?(event = nil)
      if event.nil?
        !processing_event.nil?
      else
        processing_event.id == event.id
      end
    end

    def processing_event
      DirtyPipeline.with_redis { |r| unpack r.get(active_event_key) }
    end

    private

    def pack(event)
      JSON.dump(
        "evid" => event.id,
        "txid" => event.tx_id,
        "transit" => event.transition,
        "args" => event.args,
      )
    end

    def unpack(packed_event)
      return unless packed_event
      unpacked_event = JSON.load(packed_event)
      Event.new(
        data: {
          "uuid" => unpacked_event["evid"],
          "transaction_uuid" => unpacked_event["txid"],
          "transition" => unpacked_event["transit"],
          "args" => unpacked_event["args"],
        }
      )
    end

    def events_queue_key
      "#{root}:events"
    end

    def active_event_key
      "#{root}:active"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dirty_pipeline-0.5.0 lib/dirty_pipeline/queue.rb