Sha256: 59fd4d798596bf48237579616d12fc4f99bfb7a6e2a27525471302ae0516cf13

Contents?: true

Size: 1.4 KB

Versions: 2

Compression:

Stored size: 1.4 KB

Contents

require "agent/errors"

module Agent
  class Pop
    attr_reader :uuid, :blocking_once, :notifier, :object

    def initialize(options={})
      @object        = nil
      @uuid          = options[:uuid] || UUID.generate
      @blocking_once = options[:blocking_once]
      @notifier      = options[:notifier]
      @mutex         = Mutex.new
      @cvar          = ConditionVariable.new
      @received      = false
      @closed        = false
    end

    def received?
      @received
    end

    def closed?
      @closed
    end

    def wait
      @mutex.synchronize do
        until @received || @closed
          @cvar.wait(@mutex)
        end
        return received?
      end
    end

    def send
      @mutex.synchronize do
        if @blocking_once
          _, error = @blocking_once.perform do
            @object = yield unless @closed
            @received = true
            @cvar.signal
            @notifier.notify(self) if @notifier
          end

          return error
        else
          begin
            @object = yield unless @closed
            @received = true
            @cvar.signal
            @notifier.notify(self) if @notifier
          rescue Errors::Rollback
          end
        end
      end
    end

    def close
      @mutex.synchronize do
        return if @received
        @closed = true
        @cvar.broadcast
        @notifier.notify(self) if @notifier
      end
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
agent-0.11.0 lib/agent/pop.rb
agent-0.10.0 lib/agent/pop.rb