require 'timeout' require 'open-uri' module Rhoconnect class NodeChannel class << self attr_accessor :redis,:redis2,:message_thread @message_thread = nil end RESULT_HASH = {} TIMEOUT = 30 # seconds PUB_CHANNEL = "#{$$}RedisSUB" # pub channel must link to redis sub channel SUB_CHANNEL = "#{$$}RedisPUB" # sub channel must link to redis pub channel @message_thread,@redis,@redis2 = nil class << self attr_accessor :thrd, :register_semaphore, :register_condition def get_timeout TIMEOUT end def redis url = Rhoconnect.redis.is_a?(Array) ? Rhoconnect.redis[0] : Rhoconnect.redis db_inst = RedisImpl.new db_inst.create(url) @redis ||= db_inst.db end def redis2 url = Rhoconnect.redis.is_a?(Array) ? Rhoconnect.redis[0] : Rhoconnect.redis db_inst = RedisImpl.new db_inst.create(url) @redis2 ||= db_inst.db end def exit_node NodeChannel.redis2.publish(PUB_CHANNEL,{:route => 'deregister'}.to_json) end def bootstrap @register_semaphore ||= Mutex.new @register_condition ||= ConditionVariable.new # Run in the main thread, we setup node thread and wait for it to # finish bootstrapping before main thread can continue if @message_thread @message_thread.join @message_thread = nil end @register_semaphore.synchronize do @message_thread = Thread.new{check_channel} @register_condition.wait(@register_semaphore) end end def publish_channel_and_wait(msg,curr_model) unique_id = UUIDTools::UUID.random_create.to_s.gsub(/\-/,"") msg.merge!(:request_id => unique_id) RESULT_HASH[unique_id] = {} RESULT_HASH[unique_id][:status] = 'waiting' NodeChannel.redis2.publish(PUB_CHANNEL,msg.to_json) NodeChannel.wait_for_result(unique_id,curr_model) end def check_channel NodeChannel.redis.subscribe(SUB_CHANNEL) do |on| on.message do |channel,msg| m = JSON.parse(msg) #puts "received message: #{m.inspect}" if m['exit'] == true NodeChannel.redis.unsubscribe end route_message(m) end end end def publish_channel(msg) NodeChannel.redis2.publish(PUB_CHANNEL,msg.to_json) end def wait_for_result(key,curr_model) begin Timeout::timeout(TIMEOUT) do while(RESULT_HASH[key][:status] == 'waiting') do if data = RESULT_HASH[key][:process_request] #do some logic and return data with memory RESULT_HASH[key][:process_result] = process_message(curr_model,data) RESULT_HASH[key][:process_request] = nil else sleep 0.001 end end end rescue Exception=>e RESULT_HASH[key][:status] = 'broken' RESULT_HASH[key][:result] = "exception #{e.message}\n#{e.backtrace}" puts "Timeout on wait, setting JavaScript result state to broken: #{e.inspect}" end #request waiting either timed out or returned response if RESULT_HASH[key][:status] == 'broken' or RESULT_HASH[key][:status] == 'waiting' res = RESULT_HASH.delete(key) elsif RESULT_HASH[key] =~ /JS ERROR/ res = RESULT_HASH.delete(key) raise Exception.new(res) else res = RESULT_HASH.delete(key) end res[:result] end def route_message(msg) case msg['route'] when 'request' RESULT_HASH[msg['request_id']][:process_result] = 'waiting' RESULT_HASH[msg['request_id']][:process_request] = msg wait_for_process_result(msg['request_id']) when 'response' return if RESULT_HASH[msg['request_id']] == nil if msg['error'] and msg['error'].size > 1 RESULT_HASH[msg['request_id']][:result] = "JS ERROR: #{msg['error']}" RESULT_HASH[msg['request_id']][:status] = 'done' else RESULT_HASH[msg['request_id']][:result] = msg["result"] RESULT_HASH[msg['request_id']][:status] = 'done' end when 'register' @register_semaphore.synchronize do begin register_routes(msg) rescue Exception => e puts "Error registering JavaScript routes: #{e.inspect}" puts e.backtrace.join("\n") raise e ensure @register_condition.signal end end end end def wait_for_process_result(key) Timeout::timeout(TIMEOUT){ while(RESULT_HASH[key][:process_result] == 'waiting') do sleep 0.001 end } publish_channel(RESULT_HASH[key][:process_result]) end def process_message(curr_model,data) #puts "proccessing msg #{data}" if curr_model.respond_to?(data['function']) if data['args'] and data['args'].size > 0 if(data['function'] == 'stash_result') curr_model.send('result=',data['args']) curr_model.send(data['function']) process_res = nil else res = curr_model.send(data['function'],data['args']) end else res = curr_model.send(data['function']) end else raise Exception.new("Method #{data['function']} not found in model #{curr_model.class.name.to_s}.") end process_res = res.is_a?(String) ? res : res.to_hash if res {:result=>process_res,:callback=>data['callback'],:request_id=>data['request_id'],:route=>'response'} end def register_routes(hsh) Rhoconnect::Controller::JsBase.register_routes(hsh) end end end end