Sha256: 067363849584b019c8666fe20616828eb56697c7ac5f6a6df749419c56f38df6

Contents?: true

Size: 1.77 KB

Versions: 7

Compression:

Stored size: 1.77 KB

Contents

module DirtyPipeline
  class Queue
    def initialize(operation, subject_class, subject_id, transaction_id)
      @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \
              "op_#{operation}:txid_#{transaction_id}"
    end

    def clear!
      DirtyPipeline.with_redis do |r|
        r.del active_event_key
        r.del events_queue_key
      end
    end

    def to_a
      DirtyPipeline.with_redis do |r|
        r.lrange(events_queue_key, 0, -1).map! do |packed_event|
          unpack(packed_event)
        end
      end
    end

    def push(event)
      DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) }
      self
    end
    alias :<< :push

    def unshift(event)
      DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) }
      self
    end

    def pop
      DirtyPipeline.with_redis do |r|
        data = r.lpop(events_queue_key)
        data.nil? ? r.del(active_event_key) : r.set(active_event_key, data)
        unpack(data)
      end
    end

    def processing_event
      DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) }
    end

    private

    def pack(event)
      JSON.dump(
        "evid" => event.id,
        "txid" => event.tx_id,
        "transit" => event.transition,
        "args" => event.args,
      )
    end

    def unpack(packed_event)
      return unless packed_event
      unpacked_event = JSON.load(packed_event)
      Event.new(
        data: {
          "uuid" => unpacked_event["evid"],
          "transaction_uuid" => unpacked_event["txid"],
          "transition" => unpacked_event["transit"],
          "args" => unpacked_event["args"],
        }
      )
    end

    def events_queue_key
      "#{@root}:events"
    end

    def active_event_key
      "#{@root}:active"
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
dirty_pipeline-0.7.1 lib/dirty_pipeline/queue.rb
dirty_pipeline-0.7.0 lib/dirty_pipeline/queue.rb
dirty_pipeline-0.6.4 lib/dirty_pipeline/queue.rb
dirty_pipeline-0.6.3 lib/dirty_pipeline/queue.rb
dirty_pipeline-0.6.2 lib/dirty_pipeline/queue.rb
dirty_pipeline-0.6.1 lib/dirty_pipeline/queue.rb
dirty_pipeline-0.6.0 lib/dirty_pipeline/queue.rb