Sha256: 744d081150fe87ec8218e2cbf04c75c88d2a234776bde2584b0f307bf71ba13b

Contents?: true

Size: 1.27 KB

Versions: 4

Compression:

Stored size: 1.27 KB

Contents

class Pigato::Base

  @@sockets = {}
  @@mtxs = {}
  @@mtx = Mutex.new

  def init 
    @iid = SecureRandom.uuid
  end
 
  def get_thread_id
    tid = get_proc_id() + "#" + Thread.current.object_id.to_s
    tid
  end

  def get_proc_id
    pid = "#" + Process.pid.to_s
    pid
  end

  def get_iid 
    iid = get_thread_id + '#' + @iid
    iid
  end

  def get_socket
    socket = @@sockets[get_iid]
    socket
  end

  def get_mtx
    tid = get_thread_id

    if @@mtxs[tid].nil?
      @@mtxs[tid] = Mutex.new
    end 

    return @@mtxs[tid]
  end

  def sock_create
    @@mtx.synchronize {
      pid = get_proc_id()
 
      ctx = ZMQ::context
      if ctx == nil then
        ctx = ZMQ::Context.new
        ctx.linger = 0
      end

      socket = ctx.socket ZMQ::DEALER
      socket.identity = SecureRandom.uuid
      socket.connect @broker

      if @conf[:timeout] then
        socket.rcvtimeo = @conf[:timeout];
      end

      @@sockets[get_iid] = socket
    }
  end
  
  def sock_close
    @@mtx.synchronize {
      pid = get_proc_id()

      iid = get_iid

      socket = @@sockets[iid]
      if socket
        begin
          socket.close
        rescue
        end
        @@sockets.delete(iid)
      end
    }
  end

  def start
    @active = 1
  end

  def stop
    @active = 0
  end

end 

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
pigato-0.4.1 lib/pigato/base.rb
pigato-0.4.0 lib/pigato/base.rb
pigato-0.3.1 lib/pigato/base.rb
pigato-0.3.0 lib/pigato/base.rb