# encoding: utf-8

require File.join(File.dirname(__FILE__), 'helper')

class TestZmqSocket < ZmqTestCase
  def test_fd
    ctx = ZMQ::Context.new
    sock = ctx.socket(:REP)
    assert Fixnum === sock.fd
    assert_equal(-1, sock.fd)
    assert_equal sock.fd, sock.to_i
  ensure
    ctx.destroy
  end

  def test_type
    ctx = ZMQ::Context.new
    sock = ctx.socket(:REP)
    assert_equal ZMQ::REP, sock.type
  ensure
    ctx.destroy
  end

  def test_readable_p
    ctx = ZMQ::Context.new
    rep = ctx.socket(:REP)
    rep.bind("inproc://test.socket-readable_p")
    req = ctx.connect(:REQ, "inproc://test.socket-readable_p")
    assert req.writable?
    req.send("m")
    sleep 0.1
    assert rep.readable?
  ensure
    ctx.destroy
  end

  def test_send_socket
    ctx = ZMQ::Context.new
    push = ctx.socket(:PUSH)
    assert_raises ZMQ::Error do
      push.recv
    end
  ensure
    ctx.destroy
  end

  def test_receive_socket
    ctx = ZMQ::Context.new
    pull = ctx.socket(:PULL)
    assert_raises ZMQ::Error do
      pull.send("message")
    end
  ensure
    ctx.destroy
  end

  def test_recv_timeout
    ctx = ZMQ::Context.new
    sock = ctx.socket(:REP)
    assert_nil sock.recv_timeout
    sock.recv_timeout = 10
    assert_equal 10, sock.recv_timeout
    assert_raises TypeError do
      sock.recv_timeout = :x
    end
  ensure
    ctx.destroy
  end

  def test_send_timeout
    ctx = ZMQ::Context.new
    sock = ctx.socket(:REP)
    assert_nil sock.send_timeout
    sock.send_timeout = 10
    assert_equal 10, sock.send_timeout
    assert_raises TypeError do
      sock.send_timeout = :x
    end
  ensure
    ctx.destroy
  end

  def test_gc_context_reaped
    pub = ZMQ::Context.new.socket(:PUB)
    GC.start
    pub.bind("inproc://test.socket-gc_context_reaped")
    GC.start
    pub.send("test")
    GC.start
    pub.close
  ensure
    ZMQ.context.destroy
  end

  def test_bind
    ctx = ZMQ::Context.new
    sock = ctx.socket(:REP)
    assert(sock.state & ZMQ::Socket::PENDING)
    port = sock.bind("tcp://127.0.0.1:*")
    assert sock.fd != -1
    assert(sock.state & ZMQ::Socket::BOUND)
    tcp_sock = nil
    assert_nothing_raised do
      tcp_sock = TCPSocket.new("127.0.0.1", port)
    end
  ensure
    ctx.destroy
    tcp_sock.close if tcp_sock
  end

  def test_connect
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    port = rep.bind("inproc://test.socket-connect")
    req = ctx.socket(:PAIR)
    assert(req.state & ZMQ::Socket::PENDING)
    req.connect("inproc://test.socket-connect")
    assert req.fd != -1
    assert(req.state & ZMQ::Socket::CONNECTED)
  ensure
    ctx.destroy
  end

  def test_bind_connect_errors
    ctx = ZMQ::Context.new
    req = ctx.socket(:REQ)
    rep = ctx.socket(:REP)
    assert_raises Errno::EINVAL do
      rep.bind "bad uri"
    end
    assert_raises Errno::EINVAL do
      req.connect "bad uri"
    end
  ensure
    ctx.destroy
  end

  def test_to_s
    ctx = ZMQ::Context.new
    sock = ctx.socket(:PAIR)
    rep = ctx.socket(:REP)
    port = rep.bind("tcp://127.0.0.1:*")
    req = ctx.socket(:REQ)
    assert(req.state & ZMQ::Socket::PENDING)
    req.connect("tcp://127.0.0.1:#{port}")
    assert_equal "PAIR socket", sock.to_s
    assert_equal "REP socket bound to tcp://127.0.0.1:*", rep.to_s
    assert_equal "REQ socket connected to tcp://127.0.0.1:#{port}", req.to_s
  ensure
    ctx.destroy
  end

  def test_endpoint
    ctx = ZMQ::Context.new
    rep = ctx.socket(:REP)
    port = rep.bind("tcp://127.0.0.1:*")
    req = ctx.socket(:REQ)
    req.connect("tcp://127.0.0.1:#{port}")
    assert_equal "tcp://127.0.0.1:*", rep.endpoint
    assert_equal "tcp://127.0.0.1:#{port}", req.endpoint
  ensure
    ctx.destroy
  end

  def test_close
    ctx = ZMQ::Context.new
    sock = ctx.socket(:REP)
    port = sock.bind("tcp://127.0.0.1:*")
    assert sock.fd != -1
    other = ctx.socket(:REQ)
    other.connect("tcp://127.0.0.1:#{port}")
    other.send("test")
    assert_equal "test", sock.recv
    sock.close
    other.close
    # zsocket_destroy in libczmq recycles the socket instance. libzmq don't support / expose underlying
    # connection state or teardown through public API, thus we can't assert a PENDING socket state on close
    # through the Ruby API as the socket instance has already been recycled.
    begin
      assert_equal ZMQ::Socket::PENDING, sock.state
    rescue => e
      assert_instance_of ZMQ::Error, e
      assert_match(/ZMQ::Socket instance \w* has been destroyed by the ZMQ framework/, e.message)
    end
    sleep 0.2
    assert_raises Errno::ECONNREFUSED do
      TCPSocket.new("127.0.0.1", port)
    end
  ensure
    ctx.destroy
  end

  def test_send_receive
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.bind("inproc://test.socket-send_receive")
    req = ctx.socket(:PAIR)
    req.connect("inproc://test.socket-send_receive")
    assert req.send("ping")
    assert_equal "ping", rep.recv
  ensure
    ctx.destroy
  end

  def test_verbose
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.verbose = true
    rep.bind("inproc://test.socket-verbose")
    req = ctx.socket(:PAIR)
    req.verbose = true
    req.connect("inproc://test.socket-verbose")
    assert req.send("ping")
    assert_equal "ping", rep.recv
    req.send_frame(ZMQ::Frame("frame"))
    assert_equal ZMQ::Frame("frame"), rep.recv_frame
  ensure
    ctx.destroy
  end

  def test_receive_nonblock
    ctx = ZMQ::Context.new
    rep = ctx.socket(:REP)
    port = rep.bind("tcp://127.0.0.1:*")
    req = ctx.socket(:REQ)
    req.connect("tcp://127.0.0.1:#{port}")
    assert req.send("ping")
    assert_equal nil, rep.recv_nonblock
    sleep 0.2
    assert_equal "ping", rep.recv_nonblock
  ensure
    ctx.destroy
  end

  def test_send_multi
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.bind("inproc://test.socket-send_multi")
    req = ctx.socket(:PAIR)
    req.connect("inproc://test.socket-send_multi")
    assert req.sendm("batch")
    req.sendm("of")
    req.send("messages")
    assert_equal "batch", rep.recv
    assert_equal "of", rep.recv
    assert_equal "messages", rep.recv
  ensure
    ctx.destroy
  end

  def test_send_receive_frame
    ctx = ZMQ::Context.new
    rep = ctx.socket(:REP)
    port = rep.bind("tcp://127.0.0.1:*")
    req = ctx.socket(:REQ)
    req.connect("tcp://127.0.0.1:#{port}")
    ping = ZMQ::Frame("ping")
    assert req.send_frame(ping)
    assert_equal ZMQ::Frame("ping"), rep.recv_frame
    assert rep.send_frame(ZMQ::Frame("pong"))
    assert_equal ZMQ::Frame("pong"), req.recv_frame
    assert req.send_frame(ZMQ::Frame("pong"))
    frame = rep.recv_frame_nonblock
    if frame
      assert_equal ZMQ::Frame("pong"), frame
    else
      sleep 0.3
      assert_equal ZMQ::Frame("pong"), rep.recv_frame_nonblock
    end
  ensure
    ctx.destroy
  end

  def test_send_frame_more
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.bind("inproc://test.socket-send_frame_more")
    req = ctx.socket(:PAIR)
    req.connect("inproc://test.socket-send_frame_more")
    5.times do |i|
      frame = ZMQ::Frame("m#{i}")
      req.send_frame(frame, ZMQ::Frame::MORE)
    end
    req.send_frame(ZMQ::Frame("m6"))
    expected, frames = %w(m0 m1 m2 m3 m4), []
    5.times do
      frames << rep.recv_frame.data
    end
    assert_equal expected, frames
  ensure
    ctx.destroy
  end

  def test_send_frame_reuse
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.bind("inproc://test.socket-send_frame_reuse")
    req = ctx.socket(:PAIR)
    req.connect("inproc://test.socket-send_frame_reuse")
    frame = ZMQ::Frame("reused_frame")
    5.times do |i|
      req.send_frame(frame, :REUSE)
    end
    expected, frames = ( %w(reused_frame) * 5), []
    5.times do
      frames << rep.recv_frame.data
    end
    assert_equal expected, frames
  ensure
    ctx.destroy
  end

  def test_send_frame_dontwait
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.bind("inproc://test.socket-send_frame_dontwait")
    req = ctx.socket(:PAIR)
    req.connect("inproc://test.socket-send_frame_dontwait")
    5.times do |i|
      frame = ZMQ::Frame("m#{i}")
      req.send_frame(frame, ZMQ::Frame::DONTWAIT)
    end
    expected, frames = %w(m0 m1 m2 m3 m4), []
    5.times do
      frames << rep.recv_frame.data
    end
    assert_equal expected, frames
  ensure
    ctx.destroy
  end

  def test_send_receive_message
    ctx = ZMQ::Context.new
    rep = ctx.socket(:PAIR)
    rep.verbose = true
    rep.bind("inproc://test.socket-send_receive_message")
    req = ctx.socket(:PAIR)
    req.verbose = true
    req.connect("inproc://test.socket-send_receive_message")

    msg = ZMQ::Message.new
    msg.push ZMQ::Frame("header")

    assert_nil req.send_message(msg)

    recvd_msg = rep.recv_message
    assert_instance_of ZMQ::Message, recvd_msg
    assert_equal ZMQ::Frame("header"), recvd_msg.pop
  ensure
    ctx.destroy
  end

  def test_type_str
    ctx = ZMQ::Context.new
    sock = ctx.socket(:PAIR)
    assert_equal "PAIR", sock.type_str
  ensure
    ctx.destroy
  end

  def test_sock_options
    ctx = ZMQ::Context.new
    sock = ctx.socket(:PAIR)
    sock.verbose = true
    assert_equal 0, sock.hwm
    sock.hwm = 1000
    assert_equal 1000, sock.hwm

    assert_equal 0, sock.swap
    sock.swap = 1000
    assert_equal 1000, sock.swap

    assert_equal 0, sock.affinity
    sock.affinity = 1
    assert_equal 1, sock.affinity

    assert_equal 40000, sock.rate
    sock.rate = 50000
    assert_equal 50000, sock.rate

    assert_equal 10, sock.recovery_ivl
    sock.recovery_ivl = 20
    assert_equal 20, sock.recovery_ivl

    assert_equal(-1, sock.recovery_ivl_msec)
    sock.recovery_ivl_msec = 20
    assert_equal 20, sock.recovery_ivl_msec

    assert_equal true, sock.mcast_loop?
    sock.mcast_loop = false
    assert !sock.mcast_loop?

    assert_equal 0, sock.sndbuf
    sock.sndbuf = 1000
    assert_equal 1000, sock.sndbuf

    assert_equal 0, sock.rcvbuf
    sock.rcvbuf = 1000
    assert_equal 1000, sock.rcvbuf

    assert_equal(-1, sock.linger)
    sock.linger = 10
    assert_equal 10, sock.linger

    assert_equal 100, sock.backlog
    sock.backlog = 200
    assert_equal 200, sock.backlog

    assert_equal 100, sock.reconnect_ivl
    sock.reconnect_ivl = 200
    assert_equal 200, sock.reconnect_ivl

    assert_equal 0, sock.reconnect_ivl_max
    sock.reconnect_ivl_max = 5
    assert_equal 5, sock.reconnect_ivl_max

    assert_equal -1, sock.rcvtimeo
    sock.rcvtimeo = 200
    assert_equal 200, sock.rcvtimeo

    assert_equal -1, sock.sndtimeo
    sock.sndtimeo = 200
    assert_equal 200, sock.sndtimeo

    sock.identity = "anonymous"
    assert_raises ZMQ::Error do
      sock.identity = ""
    end
    assert_raises ZMQ::Error do
      sock.identity = ("*" * 256)
    end

    assert !sock.rcvmore?

    assert_equal 0, sock.events

    sub_sock = ctx.socket(:SUB)
    sub_sock.verbose = true
    sub_sock.subscribe("ruby")
    sub_sock.unsubscribe("ruby")
  ensure
    ctx.destroy
  end
end