Sha256: 2206089fbcd50c867bc48a6e41a63a1ec74e1c3cc05bd3a81becd9df6b2c30b2

Contents?: true

Size: 1.28 KB

Versions: 1

Compression:

Stored size: 1.28 KB

Contents

class Pigato::Base

  @@sockets = {}
  @@mtxs = {}
  @@mtx = Mutex.new

  def init 
    @iid = SecureRandom.uuid
  end
 
  def get_thread_id
    tid = get_proc_id() + "#" + Thread.current.object_id.to_s
    tid
  end

  def get_proc_id
    pid = "#" + Process.pid.to_s
    pid
  end

  def get_iid 
    iid = get_thread_id + '#' + @iid
    iid
  end

  def get_socket
    socket = @@sockets[get_iid]
    socket
  end

  def get_mtx
    tid = get_thread_id

    if @@mtxs[tid].nil?
      @@mtxs[tid] = Mutex.new
    end 

    return @@mtxs[tid]
  end

  def sock_create
    @@mtx.synchronize {
      pid = get_proc_id()
 
      ctx = ZMQ::context
      if ctx == nil then
        ctx = ZMQ::Context.new
        ctx.linger = 0
      end

      socket = ctx.socket ZMQ::DEALER
      socket.identity = SecureRandom.uuid
      socket.connect @broker

      if !@conf[:timeout].nil? then
        socket.rcvtimeo = @conf[:timeout];
      end

      @@sockets[get_iid] = socket
    }
  end
  
  def sock_close
    @@mtx.synchronize {
      pid = get_proc_id()

      iid = get_iid

      socket = @@sockets[iid]
      if socket
        begin
          socket.close
        rescue
        end
        @@sockets.delete(iid)
      end
    }
  end

  def start
    @active = 1
  end

  def stop
    @active = 0
  end

end 

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pigato-0.4.2 lib/pigato/base.rb