#!/usr/bin/env ruby

$:.unshift('../lib')
require 'gearman'
require 'gearman/testlib'
require 'test/unit'
require 'thread'

Thread.abort_on_exception = true

class TestWorker < Test::Unit::TestCase
  def setup
    @server = FakeJobServer.new(self)
    @server2 = FakeJobServer.new(self)
  end

  def teardown
    @server.stop
    @server2.stop
  end

  def test_complete
    @server = FakeJobServer.new(self)
    worker = nil
    sock = nil

    s = TestScript.new
    w = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    worker_thread = Thread.new { w.loop_forever }.run

    # Create a worker and wait for it to connect to us.
    w.exec {
      worker = Gearman::Worker.new(
        "localhost:#{@server.port}", { :client_id => 'test' })
    }
    s.exec { sock = @server.expect_connection }
    s.wait

    # After it connects, it should send its ID, and it should tell us its
    # abilities when we report them.
    s.exec { @server.expect_request(sock, :set_client_id, 'test') }
    w.exec do
      worker.add_ability('echo') do |data, job| 
        job.report_status(1, 1);
        part1, part2 = data.split(//, 2)
        job.send_data(part1) # send partial data first
        part2
      end
    end
    s.exec { @server.expect_request(sock, :can_do, 'echo') }

    # It should try to grab a job when we tell it to work.
    w.exec { worker.work }
    s.exec { @server.expect_request(sock, :grab_job) }

    # If we tell it there aren't any jobs, it should go to sleep.
    s.exec { @server.send_response(sock, :no_job) }
    s.exec { @server.expect_request(sock, :pre_sleep) }

    # When we send it a noop, it should wake up and ask for a job again.
    s.exec { @server.send_response(sock, :noop) }
    s.exec { @server.expect_request(sock, :grab_job) }

    # When we give it a job, it should do it.
    s.exec { @server.send_response(sock, :job_assign, "a\0echo\0foo") }
    s.exec { @server.expect_request(sock, :work_status, "a\0001\0001") }
    s.exec { @server.expect_request(sock, :work_data, "a\000f") }
    s.exec { @server.expect_request(sock, :work_complete, "a\0oo") }

    # Test that functions are unregistered correctly.
    w.exec { worker.remove_ability('echo') }
    s.exec { @server.expect_request(sock, :cant_do, 'echo') }
    s.wait
  end

  def test_multiple_servers
    # This is cheesy.  We want to know the order that Worker#work will
    # iterate through the servers, so we make sure that server1 will be the
    # first one when the names are lexographically sorted.
    if @server2.port.to_s < @server.port.to_s
      tmp = @server
      @server = @server2
      @server2 = tmp
    end
    worker = nil
    sock1, sock2 = nil

    s1 = TestScript.new
    s2 = TestScript.new
    w = TestScript.new

    @server_thread = Thread.new { s1.loop_forever }.run
    @server2_thread = Thread.new { s2.loop_forever }.run
    worker_thread = Thread.new { w.loop_forever }.run

    # Create a worker, which should connect to both servers.
    w.exec {
      worker = Gearman::Worker.new(
        nil, { :client_id => 'test', :reconnect_sec => 0.1 }) }
    w.exec { worker.add_ability('foo') {|d,j| 'bar' } }
    w.exec {
      worker.job_servers =
        [ "localhost:#{@server.port}", "localhost:#{@server2.port}" ]
    }
    s1.exec { sock1 = @server.expect_connection }
    s2.exec { sock2 = @server2.expect_connection }
    s1.wait
    s2.wait

    # It should register itself with both.
    s1.exec { @server.expect_request(sock1, :set_client_id, 'test') }
    s1.exec { @server.expect_request(sock1, :can_do, 'foo') }
    s2.exec { @server2.expect_request(sock2, :set_client_id, 'test') }
    s2.exec { @server2.expect_request(sock2, :can_do, 'foo') }

    # It should try to get a job from both servers and then sleep.
    w.exec { worker.work }
    s1.exec { @server.expect_request(sock1, :grab_job) }
    s1.exec { @server.send_response(sock1, :no_job) }
    s2.exec { @server2.expect_request(sock2, :grab_job) }
    s2.exec { @server2.send_response(sock2, :no_job) }
    s1.exec { @server.expect_request(sock1, :pre_sleep) }
    s2.exec { @server2.expect_request(sock2, :pre_sleep) }

    # If the second server wakes it up, it should again try to get a job
    # and then do it.
    s2.exec { @server2.send_response(sock2, :noop) }
    s1.exec { @server.expect_request(sock1, :grab_job) }
    s1.exec { @server.send_response(sock1, :no_job) }
    s2.exec { @server2.expect_request(sock2, :grab_job) }
    s2.exec { @server2.send_response(sock2, :job_assign, "a\0foo\0") }
    s2.exec { @server2.expect_request(sock2, :work_complete, "a\0bar") }

    w.wait
    s1.wait
    s2.wait

    # Stop the first job server and make the worker try to reconnect to
    # both.
    old_servers = worker.job_servers
    @server.stop
    worker.job_servers = []
    worker.job_servers = old_servers
    s2.exec { sock2 = @server2.expect_connection }
    s2.wait

    # It shouldn't have any trouble with the second server.  Tell it to go
    # to work.
    s2.exec { @server2.expect_request(sock2, :set_client_id, 'test') }
    s2.exec { @server2.expect_request(sock2, :can_do, 'foo') }
    w.exec { worker.work }
    s2.exec { @server2.expect_request(sock2, :grab_job) }
    s2.exec { @server2.send_response(sock2, :no_job) }
    s2.exec { @server2.expect_request(sock2, :pre_sleep) }
    s2.wait

    # Start the first server and wait for the worker to connect to it and
    # register.
    @server.start
    s1.exec { sock1 = @server.expect_connection }
    s1.wait
    s1.exec { @server.expect_request(sock1, :set_client_id, 'test') }
    s1.exec { @server.expect_request(sock1, :can_do, 'foo') }
    s1.wait

    # Let the second server wake the worker up and then give it a job.
    s2.exec { @server2.send_response(sock2, :noop) }
    s1.exec { @server.expect_request(sock1, :grab_job) }
    s1.exec { @server.send_response(sock1, :no_job) }
    s2.exec { @server2.expect_request(sock2, :grab_job) }
    s2.exec { @server2.send_response(sock2, :job_assign, "a\0foo\0") }
    s2.exec { @server2.expect_request(sock2, :work_complete, "a\0bar") }
    s1.wait
    s2.wait
    w.wait
  end

  def test_timeout
    worker = nil
    sock = nil

    s = TestScript.new
    w = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    worker_thread = Thread.new { w.loop_forever }.run

    w.exec {
      worker = Gearman::Worker.new("localhost:#{@server.port}",
        { :client_id => 'test',
          :reconnect_sec => 0.15,
          :network_timeout_sec => 0.1 })
    }
    s.exec { sock = @server.expect_connection }
    s.wait
    s.exec { @server.expect_request(sock, :set_client_id, 'test') }

    w.exec { worker.add_ability('foo') {|d,j| 'bar' } }
    s.exec { @server.expect_request(sock, :can_do, 'foo') }

    # Don't do anything after the client asks for a job.
    w.exec { worker.work }
    s.exec { @server.expect_request(sock, :grab_job) }
    s.exec { sleep 0.16 }
    s.wait

    # The client should reconnect and ask for a job again.
    s.exec { sock = @server.expect_connection }
    s.wait

    s.exec { @server.expect_request(sock, :set_client_id, 'test') }
    s.exec { @server.expect_request(sock, :can_do, 'foo') }
    s.exec { @server.expect_request(sock, :grab_job) }
    s.exec { @server.send_response(sock, :job_assign, "a\0foo\0") }
    s.exec { @server.expect_request(sock, :work_complete, "a\0bar") }
    s.wait
    w.wait
  end

  def test_exception
    @server = FakeJobServer.new(self)
    worker = nil
    sock = nil

    s = TestScript.new
    w = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    worker_thread = Thread.new { w.loop_forever }.run

    # Create a worker and wait for it to connect to us.
    w.exec {
      worker = Gearman::Worker.new(
        "localhost:#{@server.port}", { :client_id => 'test' })
    }
    s.exec { sock = @server.expect_connection }
    s.wait

    # After it connects, it should send its ID, and it should tell us its
    # abilities when we report them.
    s.exec { @server.expect_request(sock, :set_client_id, 'test') }
    w.exec { worker.add_ability('echo') {|d,j| raise Exception.new("fooexception") } }
    s.exec { @server.expect_request(sock, :can_do, 'echo') }

    # It should try to grab a job when we tell it to work.
    w.exec { worker.work }
    s.exec { @server.expect_request(sock, :grab_job) }

    # If we tell it there aren't any jobs, it should go to sleep.
    s.exec { @server.send_response(sock, :no_job) }
    s.exec { @server.expect_request(sock, :pre_sleep) }

    # When we send it a noop, it should wake up and ask for a job again.
    s.exec { @server.send_response(sock, :noop) }
    s.exec { @server.expect_request(sock, :grab_job) }

    # When we give it a job, it should raise an excpetion and notify the server
    s.exec { @server.send_response(sock, :job_assign, "a\0echo\0foo") }
    s.exec do
      @server.expect_request(sock, :work_warning, "a\0fooexception")
      @server.expect_request(sock, :work_fail, "a")
    end

    s.wait
  end
  
  def test_worker_enabled
    @server = FakeJobServer.new(self)
    worker = nil
    sock = nil
    @result = nil

    s = TestScript.new
    w = TestScript.new

    server_thread = Thread.new { s.loop_forever }.run
    worker_thread = Thread.new { w.loop_forever }.run

    # Create a worker and wait for it to connect to us.
    w.exec {
      worker = Gearman::Worker.new(
        "localhost:#{@server.port}", { :client_id => 'test' })
    }
    s.exec { sock = @server.expect_connection }
    s.wait

    # After it connects, it should send its ID, and it should tell us its
    # abilities when we report them.
    s.exec { @server.expect_request(sock, :set_client_id, 'test') }
    w.exec do
      worker.add_ability('echo') do |data, job| 
        job.report_status(1, 1);
        part1, part2 = data.split(//, 2)
        job.send_data(part1) # send partial data first
        part2
      end
    end
    s.exec { @server.expect_request(sock, :can_do, 'echo') }

    # When we set to false worker_enabled, worker.work shall return false
    s.exec { @server.send_response(sock, :job_assign, "a\0echo\0foo") }
    w.exec { worker.worker_enabled = false; @result = worker.work }
    w.wait
    assert_equal false, @result

    # When we set to true worker_enabled, worker.work shall return true
    s.exec { @server.send_response(sock, :job_assign, "a\0echo\0foo") }
    w.exec { worker.worker_enabled = true; @result = worker.work }
    w.wait
    assert_equal true, @result

    # when there are no jobs pending, and set to false worker_enabled, worker.work shall return false
    s.exec { @server.send_response(sock, :no_job) }
    w.exec { worker.worker_enabled = false; @result = worker.work }
    w.wait
    assert_equal false, @result
  end
  
end