Sha256: c0e6dd262d1c6b4ba74844f3e6ae24fc639b951db8c0d67fd167758af1c5f776

Contents?: true

Size: 1.44 KB

Versions: 2

Compression:

Stored size: 1.44 KB

Contents

class Pigato::Base

  @@sockets = {}
  @@sockets_ids = {}
  @@mtxs = {}
  @@mtx = Mutex.new
  @@global_heartbeat_at = Time.now
  @@global_thread = nil

  def init 
    @iid = SecureRandom.uuid
  end
 
  def get_thread_id
    tid = get_proc_id() + "#" + Thread.current.object_id.to_s
    tid
  end

  def get_proc_id
    pid = "#" + Process.pid.to_s
    pid
  end

  def get_iid 
    iid = get_thread_id + '#' + @iid
    iid
  end

  def get_socket
    socket = @@sockets[get_iid]
    socket
  end

  def get_mtx
    tid = get_thread_id

    if @@mtxs[tid].nil?
      @@mtxs[tid] = Mutex.new
    end 

    return @@mtxs[tid]
  end

  def sock_create
    @@mtx.synchronize {
      pid = get_proc_id()
 
      ctx = ZMQ::context
      if ctx == nil then
        ctx = ZMQ::Context.new
        ctx.linger = 0
      end

      socket = ctx.socket ZMQ::DEALER
      sid = SecureRandom.uuid
      socket.identity = sid 
      socket.connect @broker

      if !@conf[:timeout].nil? then
        socket.rcvtimeo = @conf[:timeout]
      end

      @@sockets[get_iid] = socket
      @@sockets_ids[get_iid] = sid 
    }
  end
  
  def sock_close
    @@mtx.synchronize {
      pid = get_proc_id()

      iid = get_iid

      socket = @@sockets[iid]
      if socket
        begin
          socket.close
        rescue
        end
        @@sockets.delete(iid)
        @@sockets_ids.delete(iid)
      end
    }
  end

  def start
    @active = 1
  end

  def stop
    @active = 0
  end

end 

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
pigato-0.5.1 lib/pigato/base.rb
pigato-0.5.0 lib/pigato/base.rb