Sha256: c323de41297c2d3cf07f4e1ad16d15fcb11278853c383cc65c536d2b9d3c4c9c

Contents?: true

Size: 1.96 KB

Versions: 3

Compression:

Stored size: 1.96 KB

Contents

require "agent/errors"

module Agent
  class Push
    SKIP_MARSHAL_TYPES = [
      ::Symbol,
      ::Numeric,
      ::NilClass,
      ::TrueClass,
      ::FalseClass,
      ::Queue,
      ::SizedQueue,
      ::Thread,
      ::Mutex,
      ::Monitor,
      ::Module,
      ::IO,
      ::Proc,
      ::Method
    ]

    attr_reader :object, :uuid, :blocking_once, :notifier

    def initialize(object, options={})
      @object = case object
                when *SKIP_MARSHAL_TYPES
                  object
                else
                  if options[:skip_marshal]
                    object
                  else
                    Marshal.load(Marshal.dump(object))
                  end
                end
      @uuid          = options[:uuid] || UUID.generate
      @blocking_once = options[:blocking_once]
      @notifier      = options[:notifier]
      @mutex         = Mutex.new
      @cvar          = ConditionVariable.new
      @sent          = false
      @closed        = false
    end

    def sent?
      @sent
    end

    def closed?
      @closed
    end

    def wait
      @mutex.synchronize do
        until @sent || @closed
          @cvar.wait(@mutex)
        end
        raise Errors::ChannelClosed if @closed
      end
    end

    def receive
      @mutex.synchronize do
        raise Errors::ChannelClosed if @closed

        if @blocking_once
          _, error = @blocking_once.perform do
            yield @object
            @sent = true
            @cvar.signal
            @notifier.notify(self) if @notifier
          end

          return error
        else
          begin
            yield @object
            @sent = true
            @cvar.signal
            @notifier.notify(self) if @notifier
          rescue Errors::Rollback
          end
        end
      end
    end

    def close
      @mutex.synchronize do
        return if @sent
        @closed = true
        @cvar.broadcast
        @notifier.notify(self) if @notifier
      end
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
agent-0.12.0 lib/agent/push.rb
agent-0.11.0 lib/agent/push.rb
agent-0.10.0 lib/agent/push.rb