lib/legs.rb in Bluebie-legs-0.6 vs lib/legs.rb in Bluebie-legs-0.6.1

- old
+ new

@@ -1,141 +1,119 @@ -# Legs take you places, a networking companion to Shoes -require 'rubygems' +# Legs take you places, a networking companion +['rubygems', 'socket', 'thread'].each { |i| require i } require 'json' unless self.class.const_defined? 'JSON' -require 'socket' -require 'thread' -Thread.abort_on_exception = true # Should be able to run without this, hopefully. Helps with debugging though - class Legs + # general getters attr_reader :socket, :parent, :meta + def inspect; "<Legs:#{object_id} Meta: #{@meta.inspect}>"; end # Legs.new for a client, subclass to make a server, .new then makes server and client! def initialize(host = 'localhost', port = 30274) self.class.start(port) if self.class != Legs && !self.class.started? ObjectSpace.define_finalizer(self) { self.close! } - @socket = TCPSocket.new(host, port) and @parent = false if host.instance_of?(String) - @socket = host and @parent = port if host.instance_of?(TCPSocket) - @responses = Hash.new; @meta = {}; @closed = false + @parent = false; @responses = Hash.new; @meta = {}; @disconnected = false @responses_mutex = Mutex.new; @socket_mutex = Mutex.new + if host.instance_of?(TCPSocket) + @socket = host + @parent = port unless port.instance_of?(Numeric) + elsif host.instance_of?(String) + @socket = TCPSocket.new(host, port) + self.class.outgoing_mutex.synchronize { self.class.outgoing.push self } + else + raise "First argument needs to be a hostname, ip, or socket" + end + + @handle_data = Proc.new do |data| - data = self.__json_restore(JSON.parse(data)) + data = json_restore(JSON.parse(data)) - if data['method'] == '**remote__disconnecting**' - self.close! - elsif @parent and data['method'] - @parent.__data!(data, self) + if data['method'] + (@parent || self.class).__data!(data, self) elsif data['error'] and data['id'].nil? raise data['error'] else @responses_mutex.synchronize { @responses[data['id']] = data } end end @thread = Thread.new do - while connected? + until @socket.closed? begin - self.close! if @socket.eof? + close! if @socket.eof? data = nil - @socket_mutex.synchronize { data = @socket.gets(self.class.terminator) } + @socket_mutex.synchronize { data = @socket.gets(self.class.terminator) rescue nil } if data.nil? - self.close! + close! else @handle_data[data] end rescue JSON::ParserError => e - self.send_data!({"error" => "JSON provided is invalid. See http://json.org/ to see how to format correctly."}) + send_data!({"error" => "JSON provided is invalid. See http://json.org/ to see how to format correctly."}) rescue IOError => e - self.close! + close! end end end end # I think you can guess this one - def connected?; @socket.closed? == false and @closed == false; end + def connected?; self.class.connections.include?(self); end # closes the connection and the threads and stuff for this user def close! - @closed = true - puts "User #{self.inspect} disconnecting" if self.class.log? - @parent.event(:disconnect, self) if @parent + return if @disconnected == true + @disconnected = true + puts "User #{inspect} disconnecting" if self.class.log? + # notify the remote side notify!('**remote__disconnecting**') rescue nil - @parent.users_mutex.synchronize { @parent.users.delete(self) } if @parent + if @parent + @parent.event(:disconnect, self) + @parent.incoming_mutex.synchronize { @parent.incoming.delete(self) } + else + self.class.outgoing_mutex.synchronize { self.class.outgoing.delete(self) } + end - @socket.close rescue nil + Thread.new { sleep(1); @socket.close rescue nil } end # send a notification to this user - def notify!(method, *args) - puts "Notify #{self.__inspect}: #{method}(#{args.map { |i| i.inspect }.join(', ')})" if self.__class.log? - self.__send_data!({'method' => method.to_s, 'params' => args, 'id' => nil}) + def notify!(method, *args, &blk) + puts "Notify #{inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log? + send_data!({'method' => method.to_s, 'params' => args, 'id' => nil}) end # sends a normal RPC request that has a response - def send!(method, *args) - puts "Call #{self.__inspect}: #{method}(#{args.map { |i| i.inspect }.join(', ')})" if self.__class.log? - id = self.__get_unique_number - self.send_data! 'method' => method.to_s, 'params' => args, 'id' => id + def send!(method, *args, &blk) + puts "Call #{inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log? + id = get_unique_number + send_data! 'method' => method.to_s, 'params' => args, 'id' => id - while @responses_mutex.synchronize { @responses.keys.include?(id) } == false - sleep(0.01) + worker = Proc.new do + sleep 0.1 until @responses_mutex.synchronize { @responses.keys.include?(id) } + + result = Legs::Result.new(@responses_mutex.synchronize { @responses.delete(id) }) + puts ">> #{method} #=> #{result.data['result'].inspect}" if self.class.log? + result end - data = @responses_mutex.synchronize { @responses.delete(id) } - - error = data['error'] - raise error unless error.nil? - - puts ">> #{method} #=> #{data['result'].inspect}" if self.__class.log? - - return data['result'] + if blk.respond_to?(:call); Thread.new { blk[worker.call] } + else; worker.call.value; end end - def inspect - "<Legs:#{__object_id} Meta: #{@meta.inspect}>" - end + # catch all the rogue calls and make them work niftily + alias_method :method_missing, :send! - # does an async request which calls a block when response arrives - def send_async!(method, *args, &blk) - puts "Call #{self.__inspect}: #{method}(#{args.map { |i| i.inspect }.join(', ')})" if self.__class.log? - id = self.__get_unique_number - self.send_data! 'method' => method.to_s, 'params' => args, 'id' => id - - Thread.new do - while @responses_mutex.synchronize { @responses.keys.include?(id) } == false - sleep(0.05) - end - - data = @responses_mutex.synchronize { @responses.delete(id) } - puts ">> #{method} #=> #{data['result'].inspect}" if self.__class.log? unless data['error'] - blk[Legs::AsyncData.new(data)] - end - end - - # maps undefined methods in to rpc calls - def method_missing(method, *args) - return self.send(method, *args) if method.to_s =~ /^__/ - send!(method, *args) - end - - # hacks the send method so ancestor methods instead become rpc calls too - # if you want to use a method in a Legs superclass, prefix with __ - def send(method, *args) - return super(method.to_s.sub(/^__/, ''), *args) if method.to_s =~ /^__/ - return super(method, *args) if self.__public_methods(false).include?(method) - super('send!', method.to_s, *args) - end - # sends raw object over the socket def send_data!(data) raise "Lost remote connection" unless connected? - @socket_mutex.synchronize { @socket.write(JSON.generate(__json_marshal(data)) + self.__class.terminator) } + raw = JSON.generate(json_marshal(data)) + self.class.terminator + @socket_mutex.synchronize { @socket.write(raw) } end private @@ -144,208 +122,273 @@ case object when Bignum, Fixnum, Integer, Float, TrueClass, FalseClass, String, NilClass return object when Hash out = Hash.new - object.each_pair { |k,v| out[k.to_s] = __json_marshal(v) } + object.each_pair { |k,v| out[k.to_s] = json_marshal(v) } return out when Array - return object.map { |v| __json_marshal(v) } + return object.map { |v| json_marshal(v) } + when Symbol + return {'__jsonclass__' => ['Legs', '__make_symbol', object.to_s]} + when Exception + return {'__jsonclass__' => ['Legs::RemoteError', 'new', "<#{object.class.name}> #{object.message}", object.backtrace]} else - return {'__jsonclass__' => [object.class.name, object._dump]} if object.respond_to?(:_dump) + return {'__jsonclass__' => [object.class.name, '_load', object._dump]} if object.respond_to?(:_dump) # the default marshalling behaviour instance_vars = {} object.instance_variables.each do |var_name| - instance_vars[var_name.to_s.sub(/@/, '')] = self.__json_marshal(object.instance_variable_get(var_name)) + instance_vars[var_name.to_s.sub(/@/, '')] = json_marshal(object.instance_variable_get(var_name)) end - return {'__jsonclass__' => [object.class.name]}.merge(instance_vars) + return {'__jsonclass__' => [object.class.name, 'new']}.merge(instance_vars) end end + SAFE_CONSTRUCTORS = ['new', 'allocate', '_load'] + # takes an object from the network, and decodes any marshalled hashes back in to ruby objects def json_restore(object) case object when Hash if object.keys.include? '__jsonclass__' constructor = object.delete('__jsonclass__') class_name = constructor.shift.to_s - object_class = Module.const_get(class_name) rescue false - if object_class.name == class_name - return object_class._load(*constructor) if object_class.respond_to?(:_load) unless constructor.empty? + # find the constant through the heirachy + object_class = Module + class_name.split(/::/).each { |piece_of_const| object_class = object_class.const_get(piece_of_const) } rescue false + + if object_class + unless constructor.empty? + raise "Unsafe marshaling constructor method: #{constructor.first}" unless (object_class == Legs and constructor.first =~ /^__make_/) or SAFE_CONSTRUCTORS.include?(constructor.first) + raise "#{class_name} doesn't support the #{constructor.first} constructor" unless object_class.respond_to?(constructor.first) + instance = object_class.__send__(*constructor) + else + instance = object_class.allocate + end - instance = object_class.allocate object.each_pair do |key, value| - instance.instance_variable_set("@#{key}", self.__json_restore(value)) + instance.instance_variable_set("@#{key}", json_restore(value)) end return instance else - raise "Response contains a #{class_name} but that class is not loaded locally." + raise "Response contains a #{class_name} but that class is not loaded locally, it needs to be." end else hash = Hash.new - object.each_pair { |k,v| hash[k] = self.__json_restore(v) } + object.each_pair { |k,v| hash[k] = json_restore(v) } return hash end when Array - return object.map { |i| self.__json_restore(i) } + return object.map { |i| json_restore(i) } else return object end end # gets a unique number that we can use to match requests to responses def get_unique_number; @unique_id ||= 0; @unique_id += 1; end end +# undef's the superclass's methods so they won't get in the way +removal_list = Legs.instance_methods(true) +removal_list -= %w{JSON new class object_id send __send__ __id__ < <= <=> => > == === yield raise} +removal_list -= Legs.instance_methods(false) +Legs.class_eval { removal_list.each { |m| undef_method m } } + # the server is started by subclassing Legs, then SubclassName.start class << Legs attr_accessor :terminator, :log - attr_reader :users, :server_object, :users_mutex, :messages_mutex + attr_reader :incoming, :outgoing, :server_object, :incoming_mutex, :outgoing_mutex, :messages_mutex alias_method :log?, :log + alias_method :users, :incoming + def started?; @started; end def initializer ObjectSpace.define_finalizer(self) { self.stop! } - @users = []; @messages = Queue.new; @terminator = "\n"; @log = true - @users_mutex = Mutex.new + @incoming = []; @outgoing = []; @messages = Queue.new; @terminator = "\n"; @log = false + @incoming_mutex = Mutex.new; @outgoing_mutex = Mutex.new; @started = false end # starts the server, pass nil for port to make a 'server' that doesn't actually accept connections # This is useful for adding methods to Legs so that systems you connect to can call methods back on you def start(port=30274, &blk) - return if started? - raise "Legs.start requires a block" unless blk + return @server_class.module_eval(&blk) if started? and blk.respond_to? :call @started = true - # make the fake class - @server_class = Class.new - @server_class.module_eval { private; attr_reader :server, :caller; public } - @server_class.module_eval(&blk) - @server_object = @server_class.allocate - @server_object.instance_variable_set(:@server, self) - @server_object.instance_eval { initialize } + # makes a nice clean class to hold all the server methods. + if @server_class.nil? + @server_class = Class.new + @server_class.module_eval do + private + attr_reader :server, :caller + + # sends a notification message to all connected clients + def broadcast(*args) + if args.first.is_a?(Array) + list = args.shift + method = args.shift + elsif args.first.is_a?(String) or args.first.is_a?(Symbol) + list = server.incoming + method = args.shift + else + raise "You need to specify a 'method' to broadcast out to" + end + + list.each { |user| user.notify!(method, *args) } + end + + # Finds a user by the value of a certain property... like find_user_by :object_id, 12345 + def find_user_by_object_id value + server.incoming.find { |user| user.object_id == value } + end + + # finds user's with the specified meta keys matching the specified values, can use regexps and stuff, like a case block + def find_users_by_meta hash = nil + raise "You need to give find_users_by_meta a hash to check the user's meta hash against" if hash.nil? + server.incoming.select do |user| + hash.all? { |key, value| value === user.meta[key] } + end + end + + public # makes it public again for the user code + end + end + @server_class.module_eval(&blk) if blk.respond_to?(:call) + + if @server_object.nil? + @server_object = @server_class.allocate + @server_object.instance_variable_set(:@server, self) + @server_object.instance_eval { initialize } + end + @message_processor = Thread.new do while started? - sleep(0.01) and next if @messages.empty? + sleep 0.01 while @messages.empty? data, from = @messages.deq method = data['method']; params = data['params'] methods = @server_object.public_methods(false) - begin - raise "Supplied method is not a String" unless method.is_a?(String) - raise "Supplied params object is not an Array" unless params.is_a?(Array) - raise "Cannot run '#{method}' because it is not defined in this server" unless methods.include?(method.to_s) or methods.include? :method_missing - - puts "Call #{method}(#{params.map { |i| i.inspect }.join(', ')})" if log? - - @server_object.instance_variable_set(:@caller, from) - - result = nil - - @users_mutex.synchronize do - if methods.include?(method.to_s) - result = @server_object.__send__(method.to_s, *params) - else - result = @server_object.method_missing(method.to_s, *params) + # close dead connections + if data['method'] == '**remote__disconnecting**' + from.close! + next + else + begin + raise "Supplied method is not a String" unless method.is_a?(String) + raise "Supplied params object is not an Array" unless params.is_a?(Array) + raise "Cannot run '#{method}' because it is not defined in this server" unless methods.include?(method.to_s) or methods.include? :method_missing + + puts "Call #{method}(#{params.map(&:inspect).join(', ')})" if log? + + @server_object.instance_variable_set(:@caller, from) + + result = nil + + @incoming_mutex.synchronize do + if methods.include?(method.to_s) + result = @server_object.__send__(method.to_s, *params) + else + result = @server_object.method_missing(method.to_s, *params) + end end + + puts ">> #{method} #=> #{result.inspect}" if log? + + from.send_data!({'id' => data['id'], 'result' => result}) unless data['id'].nil? + + rescue Exception => e + from.send_data!({'error' => e, 'id' => data['id']}) unless data['id'].nil? + puts "Error: #{e}\nBacktrace: " + e.backtrace.join("\n ") if log? end - - puts ">> #{method} #=> #{result.inspect}" if log? - - from.send_data!({'id' => data['id'], 'result' => result}) unless data['id'].nil? - - rescue Exception => e - from.send_data!({'error' => e, 'id' => data['id']}) unless data['id'].nil? - puts "Backtrace: \n" + e.backtrace.map { |i| " #{i}" }.join("\n") if log? end end - end + end unless @message_processor and @message_processor.alive? - unless port.nil? or port == false + if ( port.nil? or port == false ) == false and @listener.nil? @listener = TCPServer.new(port) @acceptor_thread = Thread.new do while started? user = Legs.new(@listener.accept, self) - @users_mutex.synchronize { @users.push user } - puts "User #{user.object_id} connected, number of users: #{@users.length}" if log? + @incoming_mutex.synchronize { @incoming.push user } + puts "User #{user.object_id} connected, number of users: #{@incoming.length}" if log? self.event :connect, user end end end end # stops the server, disconnects the clients def stop @started = false - @users.each { |user| user.close! } + @incoming.each { |user| user.close! } end - # sends a notification message to all connected clients - def broadcast(method, *args) - @users.each { |user| user.notify!(method, *args) } + # returns an array of all connections + def connections + @incoming + @outgoing end - # Finds a user by the value of a certain property... like find_user_by :object_id, 12345 - def find_user_by property, value - @users.find { |user| user.__send(property) == value } - end - - def find_users_by property, *values - @users.select { |user| user.__send(property) == value } - end - - # gives you an array of all the instances of Legs which are still connected - # direction can be :both, :in, or :out - def connections direction = :both - return @users if direction == :in - list = Array.new - ObjectSpace.each_object(self) do |leg| - next if list.include?(leg) unless leg.connected? - next unless leg.parent == false if direction == :out - list.push leg - end - return list - end - # add an event call to the server's message queue def event(name, sender, *extras) return unless @server_object.respond_to?("on_#{name}") __data!({'method' => "on_#{name}", 'params' => extras.to_a, 'id' => nil}, sender) end - # gets called to handle all incomming messages (RPC requests) + # gets called to handle all incoming messages (RPC requests) def __data!(data, from) @messages.enq [data, from] end - # returns true if server is running - def started?; @started; end - - # creates a legs client, and passes it to supplied block, closes client after block finishes running - # I wouldn't have added this method to keep shoes small, but users insist comedic value makes it worthwhile - def open(*args, &blk) + # People say this syntax is too funny not to have... whatever. Works like IO and File and what have you + def open(*args) client = Legs.new(*args) - blk[client] + yield(client) client.close! end + + # add's a method to the 'server' + def define_method(name, &blk); @server_class.class_eval { define_method(name, &blk) }; end + + # lets the marshaler transport symbols + def __make_symbol(name); name.to_sym; end + + # hooks up these methods so you can use them off the main object too! + [:broadcast, :find_user_by_object_id, :find_users_by_meta].each do |name| + define_method name do |*args| + @incoming_mutex.synchronize do; @outgoing_mutex.synchronize do + @server_object.__send__(name, *args) + end; end + end + end end Legs.initializer - -class Legs::AsyncData +# represents the data response, handles throwing of errors and stuff +class Legs::Result + attr_reader :data def initialize(data); @data = data; end def result - @errored = true and raise @data['error'] if @data['error'] unless @errored - return @data['result'] + unless @data['error'].nil? or @errored + @errored = true + raise @data['error'] + end + @data['result'] end alias_method :value, :result end class Legs::StartBlockError < StandardError; end class Legs::RequestError < StandardError; end +class Legs::RemoteError < StandardError + def initialize(msg, backtrace) + super(msg) + set_backtrace(backtrace) + end +end