Sha256: ce550cff9609ec32da336f992a2837f95a848f8d9f14c90208334d2ca3b5b8da

Contents?: true

Size: 1.26 KB

Versions: 1

Compression:

Stored size: 1.26 KB

Contents

require 'drbqs/connection'
require 'drbqs/task_client'

module DRbQS

  class Client
    def initialize(access_uri, opts = {})
      @access_uri = access_uri
      @logger = Logger.new(opts[:log_file] || 'drbqs_client.log')
      @logger.level = opts[:log_level] || Logger::ERROR
      @connection = nil
      @task_client = nil
    end

    def execute_task(marshal_obj, method_sym, args)
      obj = Marshal.load(marshal_obj)
      obj.__send__(method_sym, *args)
    end
    private :execute_task

    def connect
      obj = DRbObject.new_with_uri(@access_uri)
      @connection = ConnectionClient.new(obj[:message], @logger)
      node_id = @connection.get_id
      @task_client = TaskClient.new(node_id, obj[:queue], obj[:result], @logger)
      if ary = @connection.get_initialization
        execute_task(*ary)
      end
    end

    def calculate
      cn = Thread.new do
        loop do
          @task_client.add_new_task
          @connection.respond_alive_signal
          @task_client.send_result
          sleep(1)
        end
      end
      exec = Thread.new do
        loop do
          marshal_obj, method_sym, args = @task_client.get
          @task_client.transmit(execute_task(marshal_obj, method_sym, args))
        end
      end
      cn.join
    end
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
drbqs-0.0.1 lib/drbqs/client.rb