lib/knj/process_meta.rb in knjrbfw-0.0.21 vs lib/knj/process_meta.rb in knjrbfw-0.0.22

- old
+ new

@@ -5,11 +5,14 @@ attr_reader :process, :pid def initialize(args = {}) @args = args @objects = {} + + #These variables are used to free memory in the subprocess, by using ObjectSpace#define_finalizer. The Mutex is the avoid problems when writing to the finalize-array multithreadded. @finalize = [] + @finalize_mutex = Mutex.new if @args["exec_path"] exec_path = @args["exec_path"] else exec_path = Knj::Os.executed_executable @@ -67,26 +70,36 @@ end end end def proxy_finalizer(id) - @finalize << id + @finalize_mutex.synchronize do + @finalize << id + end end def check_finalizers return nil if @finalize.empty? - remove = [] - @finalize.each do |id| - @process.send("obj" => { - "type" => "unset", - "var_name" => id - }) - remove << id + finalize = nil + @finalize_mutex.synchronize do + finalize = @finalize + @finalize = [] + + begin + @process.send("obj" => { + "type" => "unset_multiple", + "var_names" => finalize + }) + rescue => e + if e.message.to_s.index("Var-name didnt exist when trying to unset:") + #ignore. + else + raise e + end + end end - - @finalize -= remove end #Parses the arguments given. Proxy-object-arguments will be their natural objects in the subprocess. def self.args_parse(args) if args.is_a?(Array) @@ -182,10 +195,14 @@ raise "Unknown result: '#{res}'." end #Spawns a new object in the subprocess by that classname, with those arguments and with that block. def new(class_name, *args, &block) + #We need to check finalizers first, so we wont accidently reuse an ID, which will then be unset in the process. + self.check_finalizers + + #Spawn and return the object. return self.spawn_object(class_name, nil, *args, &block) end #Spawns a new object in the subprocess and returns a proxy-variable for that subprocess-object. def spawn_object(class_name, var_name = nil, *args, &block) @@ -222,23 +239,27 @@ return "Unknown result: '#{res}'." end #Calls a method on an object and returns the result. def call_object(args, &block) - self.check_finalizers - if args.key?("capture_return") capture_return = args["capture_return"] else capture_return = true end + if args["buffered"] + type = "call_object_buffered" + else + type = "call_object_block" + end + res = @process.send( { "buffer_use" => args["buffer_use"], "obj" => { - "type" => "call_object_block", + "type" => type, "var_name" => args["var_name"], "method_name" => args["method_name"], "capture_return" => capture_return, "args" => Knj::Process_meta.args_parse(args["args"]) } @@ -327,13 +348,18 @@ #Destroyes the project and unsets all variables on the Process_meta-object. def destroy @process.send("obj" => {"type" => "exit"}) @err_thread.kill if @err_thread @process.destroy - Process.kill("TERM", @pid) begin + Process.kill("TERM", @pid) + rescue Errno::ESRCH + #Process is already dead - ignore. + end + + begin sleep 0.1 process_exists = Knj::Unix_proc.list("pids" => [@pid]) raise "Process exists." if !process_exists.empty? rescue => e raise e if e.message != "Process exists." @@ -407,7 +433,116 @@ "buffer_use" => @_process_meta_block_buffer_use, "capture_return" => false }, &block ) + end + + def _pm_buffered_caller(args) + return Knj::Process_meta::Proxy_obj::Buffered_caller.new({ + :name => @args[:name], + :process_meta => @args[:process_meta] + }.merge(args)) + end +end + +class Knj::Process_meta::Proxy_obj::Buffered_caller + def initialize(args) + @args = args + @buffer = [] + @mutex = Mutex.new + @mutex_write = Mutex.new + @count = 0 + @debug = @args[:debug] if @args[:debug] + + if @args[:count_to] + @count_to = @args[:count_to] + else + @count_to = 1000 + end + + @buffer_max = @count_to * 2 + @threads = [] if @args[:async] + end + + def method_missing(method_name, *args) + if method_name.to_s == @args[:method_name].to_s + self._pm_call(*args) + else + raise NoMethodError, "No such method: '#{method_name}'." + end + end + + def _pm_call(*args) + raise @raise_error if @raise_error + + @mutex.synchronize do + while @count >= @count_to and @buffer.length >= @buffer_max + STDOUT.print "Waiting for write to complete...\n" if @debug + sleep 0.1 + end + + STDOUT.print "Adding to buffer #{@buffer.length}...\n" if @debug + @buffer << args + @count += 1 + end + + self._pm_flush if @count >= @count_to and !@writing + return nil + end + + def _pm_flush(*args) + raise @raise_error if @raise_error + + buffer = nil + @mutex.synchronize do + buffer = @buffer + @buffer = [] + @count = 0 + end + + if @args[:async] + begin + @threads << Thread.new do + self._pm_flush_real(buffer) + end + rescue => e + @raise_error = e + end + + return nil + else + return self._pm_flush_real(buffer) + end + end + + def _pm_flush_real(buffer) + @mutex_write.synchronize do + STDOUT.print "Writing...\n" if @debug + @writing = true + + begin + return @args[:process_meta].call_object( + "var_name" => @args[:name], + "method_name" => @args[:method_name], + "args" => buffer, + "buffered" => true, + "capture_return" => false + ) + ensure + @writing = false + end + end + end + + def _pm_close + self._pm_flush + + if @args[:async] + @threads.each do |thread| + thread.join + end + end + + raise @raise_error if @raise_error end end \ No newline at end of file