#!/usr/bin/env ruby

$:.unshift('../lib')
require 'gearman'
require 'gearman/testlib'
require 'test/unit'
require 'thread'

Thread.abort_on_exception = true

class TestClient < Test::Unit::TestCase
  ##
  # Do a simple test of the functionality of the client code.
  def test_client
    server = FakeJobServer.new(self)
    client, task1, task2, taskset, sock, res1, res2 = nil

    s = TestScript.new
    c = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new("localhost:#{server.port}") }

    c.exec { task1 = Gearman::Task.new('add', '5 2') }
    c.exec { task1.on_complete {|d| res1 = d.to_i } }
    c.exec { taskset = Gearman::TaskSet.new(client) }
    c.exec { taskset.add_task(task1) }
    s.exec { sock = server.expect_connection }
    s.wait

    s.exec { server.expect_request(sock, :submit_job, "add\000\0005 2") }
    s.exec { server.send_response(sock, :job_created, 'a') }

    # Create a second task.  It should use the same socket as the first.
    c.exec { task2 = Gearman::Task.new('add', '10 5') }
    c.exec { task2.on_complete {|d| res2 = d.to_i } }
    c.exec { taskset.add_task(task2) }

    # Return the response to the first job before the handle for the
    # second.
    s.exec { server.send_response(sock, :work_complete, "a\0007") }
    s.exec { server.expect_request(sock, :submit_job, "add\000\00010 5") }
    s.exec { server.send_response(sock, :job_created, 'b') }

    # After the client waits on the taskset, send the response to the
    # second job.
    c.exec { taskset.wait }
    s.exec { server.send_response(sock, :work_complete, "b\00015") }
    c.wait
    s.wait

    # Check that we got the right answers.
    assert_equal(7, res1)
    assert_equal(15, res2)
  end

  ##
  # Test Client#do_task.
  def test_do_task
    server = FakeJobServer.new(self)
    client, sock, res = nil

    s = TestScript.new
    c = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new("localhost:#{server.port}") }

    c.exec { res = client.do_task('add', '5 2').to_i }
    s.exec { sock = server.expect_connection }
    s.wait

    s.exec { server.expect_request(sock, :submit_job, "add\000\0005 2") }
    s.exec { server.send_response(sock, :job_created, 'a') }
    s.exec { server.send_response(sock, :work_complete, "a\0007") }
    c.wait
    s.wait

    assert_equal(7, res)

    c.exec { res = client.do_task('add', '1 2') }
    s.exec { server.expect_request(sock, :submit_job, "add\000\0001 2") }
    s.exec { server.send_response(sock, :job_created, 'a') }
    s.exec { server.send_response(sock, :work_fail, 'a') }
    c.wait
    s.wait

    assert_equal(nil, res)
  end

  ##
  # Test that Gearman::Task's callback's get called when they should.
  def test_callbacks
    server = FakeJobServer.new(self)
    client, task, taskset, sock = nil
    failed, retries, num, den = nil

    s = TestScript.new
    c = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new("localhost:#{server.port}") }

    task = Gearman::Task.new('foo', 'bar',
      { :retry_count => 2 })
    task.on_fail { failed = true }
    task.on_retry {|r| retries = r }
    task.on_status {|n,d| num = n.to_i; den = d.to_i }

    c.exec { taskset = Gearman::TaskSet.new(client) }
    c.exec { taskset.add_task(task) }
    s.exec { sock = server.expect_connection }
    s.wait

    # Send three failures back to the client.
    c.exec { taskset.wait }
    s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
    s.exec { server.send_response(sock, :job_created, 'a') }
    s.exec { server.send_response(sock, :work_fail, 'a') }
    s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
    s.exec { server.send_response(sock, :job_created, 'b') }
    s.exec { server.send_response(sock, :work_fail, 'b') }
    s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
    s.exec { server.send_response(sock, :job_created, 'c') }
    s.exec { server.send_response(sock, :work_status, "c\0001\0002") }
    s.exec { server.send_response(sock, :work_fail, 'c') }
    c.wait
    s.wait

    assert_equal(true, failed)
    assert_equal(2, retries)
    assert_equal(1, num)
    assert_equal(2, den)
  end

  def test_failure
    server = FakeJobServer.new(self)
    client, task1, task2, taskset, sock = nil
    res1, res2, fail1, fail2, setres = nil

    s = TestScript.new
    c = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new("localhost:#{server.port}") }

    c.exec { task1 = Gearman::Task.new('func1', 'a') }
    c.exec { task1.on_complete {|d| res1 = d } }
    c.exec { task1.on_fail { fail1 = true } }
    c.exec { taskset = Gearman::TaskSet.new(client) }
    c.exec { taskset.add_task(task1) }
    s.exec { sock = server.expect_connection }
    s.wait

    s.exec { server.expect_request(sock, :submit_job, "func1\000\000a") }
    s.exec { server.send_response(sock, :job_created, 'a') }

    c.exec { task2 = Gearman::Task.new('func2', 'b') }
    c.exec { task2.on_complete {|d| res2 = d } }
    c.exec { task2.on_fail { fail2 = true } }
    c.exec { taskset.add_task(task2) }

    s.exec { server.expect_request(sock, :submit_job, "func2\000\000b") }
    s.exec { server.send_response(sock, :job_created, 'b') }

    s.exec { server.send_response(sock, :work_complete, "a\000a1") }
    s.exec { server.send_response(sock, :work_fail, "b") }

    c.exec { setres = taskset.wait }
    c.wait
    s.wait

    assert_equal('a1', res1)
    assert_equal(nil, res2)
    assert_equal(nil, fail1)
    assert_equal(true, fail2)
    assert_equal(false, setres)
  end

  ##
  # Test that user-supplied uniq values are handled correctly.
  def test_uniq
    server1 = FakeJobServer.new(self)
    server2 = FakeJobServer.new(self)
    client = nil
    sock1, sock2 = nil
    taskset = nil
    task1, task2, task3, task4 = nil
    res1, res2, res3, res4 = nil
    hostport1 = "localhost:#{server1.port}"
    hostport2 = "localhost:#{server2.port}"

    s1 = TestScript.new
    s2 = TestScript.new
    c = TestScript.new

    server1_thread = Thread.new { s1.loop_forever }.run
    server2_thread = Thread.new { s2.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new }
    c.exec { client.job_servers = [hostport1, hostport2] }
    c.exec { taskset = Gearman::TaskSet.new(client) }

    # Submit a task with uniq key 'u' to the first server.
    c.exec { client.test_hostport = hostport1 }
    c.exec { task1 = Gearman::Task.new('func1', 'arg', { :uniq => 'u' }) }
    c.exec { task1.on_complete {|d| res1 = d.to_i } }
    c.exec { taskset.add_task(task1) }

    s1.exec { sock1 = server1.expect_connection }
    s1.wait

    s1.exec { server1.expect_request(
      sock1, :submit_job, "func1\000#{'u'.hash}\000arg") }
    s1.exec { server1.send_response(sock1, :job_created, 'a') }

    # If we submit a second task with the same key, it should get sent to
    # the same server.
    c.exec { client.test_hostport = hostport2 }
    c.exec { task2 = Gearman::Task.new('func1', 'arg2', { :uniq => 'u' }) }
    c.exec { task2.on_complete {|d| res2 = d.to_i } }
    c.exec { taskset.add_task(task2) }

    s1.exec { server1.expect_request(
      sock1, :submit_job, "func1\000#{'u'.hash}\000arg2") }
    s1.exec { server1.send_response(sock1, :job_created, 'a') }

    # When we create a task with key 'a', it should go to the second
    # server.
    c.exec { task3 = Gearman::Task.new('func1', 'arg', { :uniq => 'a' }) }
    c.exec { task3.on_complete {|d| res3 = d.to_i } }
    c.exec { taskset.add_task(task3) }

    s2.exec { sock2 = server2.expect_connection }
    s2.wait

    s2.exec { server2.expect_request(
      sock2, :submit_job, "func1\000#{'a'.hash}\000arg") }
    s2.exec { server2.send_response(sock2, :job_created, 'b') }

    # If we tell the client to use the first server again and create
    # another job with no uniq key, it should go back to the first server.
    c.exec { client.test_hostport = hostport1 }
    c.exec { task4 = Gearman::Task.new('func1', 'arg') }
    c.exec { task4.on_complete {|d| res4 = d.to_i } }
    c.exec { taskset.add_task(task4) }

    s1.exec { server1.expect_request(
      sock1, :submit_job, "func1\000\000arg") }
    s1.exec { server1.send_response(sock1, :job_created, 'c') }

    # Send back responses for all the handles we've handed out and make
    # sure that we got what we expected.
    c.exec { taskset.wait }
    s1.exec { server1.send_response(sock1, :work_complete, "a\0001") }
    s2.exec { server2.send_response(sock2, :work_complete, "b\0002") }
    s1.exec { server1.send_response(sock1, :work_complete, "c\0003") }

    c.wait
    s1.wait
    s2.wait

    assert_equal(1, res1)
    assert_equal(1, res2)
    assert_equal(2, res3)
    assert_equal(3, res4)

    c.wait
    s1.wait
    s2.wait
  end

  ##
  # Test that '-' uniq values work correctly.
  def test_uniq_dash
    server1 = FakeJobServer.new(self)
    server2 = FakeJobServer.new(self)
    client, taskset, sock1, sock2 = nil
    task1, task2, task3 = nil
    res1, res2, res3 = nil
    hostport1 = "localhost:#{server1.port}"
    hostport2 = "localhost:#{server2.port}"

    s1 = TestScript.new
    s2 = TestScript.new
    c = TestScript.new

    server1_thread = Thread.new { s1.loop_forever }.run
    server2_thread = Thread.new { s2.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new }
    c.exec { client.job_servers = [hostport1, hostport2] }
    c.exec { taskset = Gearman::TaskSet.new(client) }

    # The first task uses uniq = '-' with the argument 'arg'.
    c.exec { client.test_hostport = hostport1 }
    c.exec { task1 = Gearman::Task.new('func1', 'arg', { :uniq => '-' }) }
    c.exec { task1.on_complete {|d| res1 = d.to_i } }
    c.exec { taskset.add_task(task1) }

    s1.exec { sock1 = server1.expect_connection }
    s1.wait

    s1.exec { server1.expect_request(
      sock1, :submit_job, "func1\000#{'arg'.hash}\000arg") }
    s1.exec { server1.send_response(sock1, :job_created, 'a') }

    # The second task uses the same arg, so it should be merged with the
    # first by the server (and also be executed on the first server, even
    # though we've changed the client to use the second by default).
    c.exec { client.test_hostport = hostport2 }
    c.exec { task2 = Gearman::Task.new('func1', 'arg', { :uniq => '-' }) }
    c.exec { task2.on_complete {|d| res2 = d.to_i } }
    c.exec { taskset.add_task(task2) }

    s1.exec { server1.expect_request(
      sock1, :submit_job, "func1\000#{'arg'.hash}\000arg") }
    s1.exec { server1.send_response(sock1, :job_created, 'a') }

    # The third task uses 'arg2', so it should not be merged and instead
    # run on the second server.
    c.exec { task3 = Gearman::Task.new('func1', 'arg2', { :uniq => '-' }) }
    c.exec { task3.on_complete {|d| res3 = d.to_i } }
    c.exec { taskset.add_task(task3) }

    s2.exec { sock2 = server2.expect_connection }
    s2.wait

    s2.exec { server2.expect_request(
      sock2, :submit_job, "func1\000#{'arg2'.hash}\000arg2") }
    s2.exec { server2.send_response(sock2, :job_created, 'b') }

    # Send back results for the two handles that we've handed out.
    c.exec { taskset.wait }
    s1.exec { server1.send_response(sock1, :work_complete, "a\0001") }
    s2.exec { server2.send_response(sock2, :work_complete, "b\0002") }

    c.wait
    s1.wait
    s2.wait

    assert_equal(1, res1)
    assert_equal(1, res2)
    assert_equal(2, res3)
  end

  ##
  # Test that NUL bytes in returned data are preserved.
  def test_nuls_in_data
    server = FakeJobServer.new(self)
    client, sock, res = nil

    s = TestScript.new
    c = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new("localhost:#{server.port}") }

    c.exec { res = client.do_task('foo', nil) }
    s.exec { sock = server.expect_connection }
    s.wait

    s.exec { server.expect_request(sock, :submit_job, "foo\000\000") }
    s.exec { server.send_response(sock, :job_created, 'a') }
    s.exec { server.send_response(sock, :work_complete, "a\0001\0002\0003") }
    c.wait
    s.wait

    assert_equal("1\0002\0003", res)
  end

  ##
  # Test that clients time out when the server sends a partial packet and
  # then hangs.
  def test_read_timeouts
    server = FakeJobServer.new(self)
    client, sock, task, taskset, res = nil

    s = TestScript.new
    c = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    client_thread = Thread.new { c.loop_forever }.run

    c.exec { client = Gearman::Client.new("localhost:#{server.port}") }

    # First, create a new task.  The server claims to be sending back a
    # packet with 1 byte of data, but actually sends an empty packet.  The
    # client should time out after 0.1 sec.
    c.exec { taskset = Gearman::TaskSet.new(client) }
    c.exec { task = Gearman::Task.new('foo', 'bar') }
    c.exec { client.task_create_timeout_sec = 0.1 }
    c.exec { res = taskset.add_task(task) }
    s.exec { sock = server.expect_connection }
    s.wait

    s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
    s.exec { server.send_response(sock, :job_created, '', 1) }
    c.wait
    s.wait

    assert_equal(false, res)

    # Now create a task, but only return a partial packet for
    # work_complete.  The client should again time out after 0.1 sec.
    c.exec { res = taskset.add_task(task) }
    s.exec { sock = server.expect_connection }
    s.wait

    s.exec { server.expect_request(sock, :submit_job, "foo\000\000bar") }
    s.exec { server.send_response(sock, :job_created, 'a') }
    c.exec { res = taskset.wait(0.1) }
    s.exec { server.send_response(sock, :work_complete, "a\000", 3) }
    c.wait
    s.wait

    assert_equal(false, res)
  end
end