lib/plezi/websockets/message_dispatch.rb in plezi-0.14.4 vs lib/plezi/websockets/message_dispatch.rb in plezi-0.14.5

- old
+ new

@@ -1,97 +1,135 @@ -require 'set' -require 'securerandom' -require 'yaml' module Plezi - module Base - module MessageDispatch - class << self - # Allows pub/sub drivers to attach to the message dispatch using `MessageDispatch.drivers << driver` - attr_reader :drivers - end - @drivers = [].to_set + module Base + # Websocket Message Dispatching Service, including the autoscaling driver control (at the moment Redis is the only builtin driver). + module MessageDispatch + # add class attribute accessors. + class << self + # Allows pub/sub drivers to attach to the message dispatch using `MessageDispatch.drivers << driver` + attr_reader :drivers + end + @drivers = [].to_set - module_function + module_function - @ppid = ::Process.pid + # The YAML safe types used by Plezi + SAFE_TYPES = [Symbol, Date, Time, Encoding, Struct, Regexp, Range, Set].freeze + # a single use empty array (prevents the use of temporary objects where possible) + EMPTY_ARGS = [].freeze + # keeps track of the current process ID + @ppid = ::Process.pid + # returns a Plezi flavored pid UUID, used to set the pub/sub channel when scaling + def pid + process_pid = ::Process.pid + if @ppid != process_pid + @pid = nil + @ppid = process_pid + end + @pid ||= SecureRandom.urlsafe_base64.tap { |str| @prefix_len = str.length } + end - def pid - if(@ppid != ::Process.pid) - @pid = nil - @ppid = ::Process.pid - end - @pid ||= SecureRandom.urlsafe_base64.tap { |str| @prefix_len = str.length } - end + # initializes the drivers when possible. + def _init + @drivers.each(&:connect) + end - def _init - @drivers.each(&:connect) - end + # Pushes a message to the Pub/Sub drivers + def push(message) + # message[:type] = message[:type].name if message[:type].is_a?(Class) + message[:origin] = pid + hst = message.delete(:host) || Plezi.app_name + yml = message.to_yaml + @drivers.each { |drv| drv.push(hst, yml) } + end - def push(message) - # message[:type] = message[:type].name if message[:type].is_a?(Class) - message[:origin] = pid - hst = message.delete(:host) || Plezi.app_name - yml = message.to_yaml - @drivers.each { |d| d.push(hst, yml) } - end + # Parses a text message received through a Pub/Sub service. + def <<(msg) + msg = YAML.safe_load(msg, SAFE_TYPES) + return if msg[:origin] == pid + target_type = msg[:type] || :all + event = msg[:event] + if (target = msg[:target]) + Iodine::Websocket.defer(target2uuid(target)) { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[event], *(msg[:args]))) if ws._pl_ws_map[event] } + return + end + if target_type == :all + Iodine::Websocket.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[event], *(msg[:args]))) if ws._pl_ws_map[event] } + return + end + if event == :write2everyone + return unless msg[:data] + mth = msg[:method] + if(mth) + target_type = Object.const_get target_type + mth = target_type.method(mth) + return unless mth + Iodine::Websocket.each_write msg[:data], &mth + else + Iodine::Websocket.each_write msg[:data] + end + return + end + target_type = Object.const_get target_type + if target_type._pl_ws_map[event] + Iodine::Websocket.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[event], *(msg[:args]))) if ws.is_a?(target_type) } + return + end - def <<(msg) - @safe_types ||= [Symbol, Date, Time, Encoding, Struct, Regexp, Range, Set].freeze - msg = YAML.safe_load(msg, @safe_types) - return if msg[:origin] == pid - msg[:type] ||= msg['type'.freeze] - msg[:type] = Object.const_get msg[:type] if msg[:type] && msg[:type] != :all - if msg[:target] ||= msg['target'.freeze] - Iodine::Websocket.defer(target2uuid(msg[:target])) { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[msg[:event]], *(msg[:args] ||= msg['args'.freeze] || []))) if ws._pl_ws_map[msg[:event] ||= msg['event'.freeze]] } - elsif (msg[:type]) == :all - Iodine::Websocket.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[msg[:event]], *(msg[:args] ||= msg['args'.freeze] || []))) if ws._pl_ws_map[msg[:event] ||= msg['event'.freeze]] } - else - Iodine::Websocket.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[msg[:event]], *(msg[:args] ||= msg['args'.freeze] || []))) if ws.is_a?(msg[:type]) && msg[:type]._pl_ws_map[msg[:event] ||= msg['event'.freeze]] } - end + rescue => e + puts '*** The following could be a security breach attempt:', e.message, e.backtrace + nil + end - rescue => e - puts '*** The following could be a security breach attempt:', e.message, e.backtrace - nil - end + # Sends a message to a specific target, if it's on this machine, otherwise forwards the message to the Pub/Sub. + def unicast(_sender, target, meth, args) + return false if target.nil? + if (tuuid = target2uuid(target)) + Iodine::Websocket.defer(tuuid) { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws._pl_ws_map[meth] } + return true + end + push target: target, args: args, host: target2pid(target) + end - def unicast(_sender, target, meth, args) - return false if target.nil? - if (tuuid = target2uuid) - Iodine::Websocket.defer(tuuid) { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws._pl_ws_map[meth] } - return true - end - push target: target, args: args, host: target2pid(target) - end + # Sends a message to a all targets of a speific **type**, as well as pushing the message to the Pub/Sub drivers. + def broadcast(sender, meth, args) + target_type = nil + if sender.is_a?(Class) + target_type = sender + sender = Iodine::Websocket + else + target_type = sender.class + end + sender.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws.is_a?(target_type) && ws._pl_ws_map[meth] } + push type: target_type.name, args: args, event: meth + end - def broadcast(sender, meth, args) - if sender.is_a?(Class) - Iodine::Websocket.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws.is_a?(sender) && ws._pl_ws_map[meth] } - push type: sender.name, args: args, event: meth - else - sender.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws.is_a?(sender.class) && ws._pl_ws_map[meth] } - push type: sender.class.name, args: args, event: meth - end - end + # Sends a message to a all existing websocket connections, as well as pushing the message to the Pub/Sub drivers. + def multicast(sender, meth, args) + sender = Iodine::Websocket if sender.is_a?(Class) + sender.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws._pl_ws_map[meth] } + push type: :all, args: args, event: meth + end - def multicast(sender, meth, args) - if sender.is_a?(Class) - Iodine::Websocket.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws._pl_ws_map[meth] } - push type: :all, args: args, event: meth - else - sender.each { |ws| ws._pl_ad_review(ws.__send__(ws._pl_ws_map[meth], *args)) if ws._pl_ws_map[meth] } - push type: :all, args: args, event: meth - end - end + # Writes directly to all clients of all controllers. + def write2everyone(sender, data, filter_owner = nil, filter_name = nil) + sender = Iodine::Websocket if sender.is_a?(Class) + mth = nil + raise TypeError, "Plezi\#write_each filter error - method doesn't exist? #{filter_owner}.#{filter_name}" if(filter_owner && !(mth = filter_owner.method(filter_name))) + mth ? sender.each_write(data, &mth) : sender.each_write(data) + push event: :write2everyone, data: data, type: (filter_owner || Iodine::Websocket).name, method: (filter_owner && filter_name) + end - def target2uuid(target) - return nil unless target.start_with? pid - target[@prefix_len..-1].to_i - end + # Converts a target Global UUID to a localized UUID + def target2uuid(target) + return nil unless target.start_with? pid + target[@prefix_len..-1].to_i + end - def target2pid(target) - target ? target[0..(@prefix_len - 1)] : Plezi.app_name + # Extracts the machine part from a target's Global UUID + def target2pid(target) + target ? target[0..(@prefix_len - 1)] : Plezi.app_name + end end - end - end + end end # connect default drivers require 'plezi/websockets/redis'