Sha256: 3d0ae5c3c3721d41764b086d24a76cf9d00e64b51477373c6a963829c63ae494

Contents?: true

Size: 1.42 KB

Versions: 7

Compression:

Stored size: 1.42 KB

Contents

class Pigato::Base

  @@sockets = {}
  @@sockets_ids = {}
  @@mtxs = {}
  @@mtx = Mutex.new
  @@global_heartbeat_at = Time.now

  def init 
    @iid = SecureRandom.uuid
  end
 
  def get_thread_id
    tid = get_proc_id() + "#" + Thread.current.object_id.to_s
    tid
  end

  def get_proc_id
    pid = "#" + Process.pid.to_s
    pid
  end

  def get_iid 
    iid = get_thread_id + '#' + @iid
    iid
  end

  def get_socket
    socket = @@sockets[get_iid]
    socket
  end

  def get_mtx
    tid = get_thread_id

    if @@mtxs[tid].nil?
      @@mtxs[tid] = Mutex.new
    end 

    return @@mtxs[tid]
  end

  def sock_create
    @@mtx.synchronize {
      pid = get_proc_id()
 
      ctx = ZMQ::context
      if ctx == nil then
        ctx = ZMQ::Context.new
        ctx.linger = 0
      end

      socket = ctx.socket ZMQ::DEALER
      sid = SecureRandom.uuid
      socket.identity = sid 
      socket.connect @broker

      if !@conf[:timeout].nil? then
        socket.rcvtimeo = @conf[:timeout]
      end

      @@sockets[get_iid] = socket
      @@sockets_ids[get_iid] = sid 
    }
  end
  
  def sock_close
    @@mtx.synchronize {
      pid = get_proc_id()

      iid = get_iid

      socket = @@sockets[iid]
      if socket
        begin
          socket.close
        rescue
        end
        @@sockets.delete(iid)
        @@sockets_ids.delete(iid)
      end
    }
  end

  def start
    @active = 1
  end

  def stop
    @active = 0
  end

end 

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
pigato-0.4.9 lib/pigato/base.rb
pigato-0.4.8 lib/pigato/base.rb
pigato-0.4.7 lib/pigato/base.rb
pigato-0.4.6 lib/pigato/base.rb
pigato-0.4.5 lib/pigato/base.rb
pigato-0.4.4 lib/pigato/base.rb
pigato-0.4.3 lib/pigato/base.rb