lib/knj/process_meta.rb in knjrbfw-0.0.21 vs lib/knj/process_meta.rb in knjrbfw-0.0.22
- old
+ new
@@ -5,11 +5,14 @@
attr_reader :process, :pid
def initialize(args = {})
@args = args
@objects = {}
+
+ #These variables are used to free memory in the subprocess, by using ObjectSpace#define_finalizer. The Mutex is the avoid problems when writing to the finalize-array multithreadded.
@finalize = []
+ @finalize_mutex = Mutex.new
if @args["exec_path"]
exec_path = @args["exec_path"]
else
exec_path = Knj::Os.executed_executable
@@ -67,26 +70,36 @@
end
end
end
def proxy_finalizer(id)
- @finalize << id
+ @finalize_mutex.synchronize do
+ @finalize << id
+ end
end
def check_finalizers
return nil if @finalize.empty?
- remove = []
- @finalize.each do |id|
- @process.send("obj" => {
- "type" => "unset",
- "var_name" => id
- })
- remove << id
+ finalize = nil
+ @finalize_mutex.synchronize do
+ finalize = @finalize
+ @finalize = []
+
+ begin
+ @process.send("obj" => {
+ "type" => "unset_multiple",
+ "var_names" => finalize
+ })
+ rescue => e
+ if e.message.to_s.index("Var-name didnt exist when trying to unset:")
+ #ignore.
+ else
+ raise e
+ end
+ end
end
-
- @finalize -= remove
end
#Parses the arguments given. Proxy-object-arguments will be their natural objects in the subprocess.
def self.args_parse(args)
if args.is_a?(Array)
@@ -182,10 +195,14 @@
raise "Unknown result: '#{res}'."
end
#Spawns a new object in the subprocess by that classname, with those arguments and with that block.
def new(class_name, *args, &block)
+ #We need to check finalizers first, so we wont accidently reuse an ID, which will then be unset in the process.
+ self.check_finalizers
+
+ #Spawn and return the object.
return self.spawn_object(class_name, nil, *args, &block)
end
#Spawns a new object in the subprocess and returns a proxy-variable for that subprocess-object.
def spawn_object(class_name, var_name = nil, *args, &block)
@@ -222,23 +239,27 @@
return "Unknown result: '#{res}'."
end
#Calls a method on an object and returns the result.
def call_object(args, &block)
- self.check_finalizers
-
if args.key?("capture_return")
capture_return = args["capture_return"]
else
capture_return = true
end
+ if args["buffered"]
+ type = "call_object_buffered"
+ else
+ type = "call_object_block"
+ end
+
res = @process.send(
{
"buffer_use" => args["buffer_use"],
"obj" => {
- "type" => "call_object_block",
+ "type" => type,
"var_name" => args["var_name"],
"method_name" => args["method_name"],
"capture_return" => capture_return,
"args" => Knj::Process_meta.args_parse(args["args"])
}
@@ -327,13 +348,18 @@
#Destroyes the project and unsets all variables on the Process_meta-object.
def destroy
@process.send("obj" => {"type" => "exit"})
@err_thread.kill if @err_thread
@process.destroy
- Process.kill("TERM", @pid)
begin
+ Process.kill("TERM", @pid)
+ rescue Errno::ESRCH
+ #Process is already dead - ignore.
+ end
+
+ begin
sleep 0.1
process_exists = Knj::Unix_proc.list("pids" => [@pid])
raise "Process exists." if !process_exists.empty?
rescue => e
raise e if e.message != "Process exists."
@@ -407,7 +433,116 @@
"buffer_use" => @_process_meta_block_buffer_use,
"capture_return" => false
},
&block
)
+ end
+
+ def _pm_buffered_caller(args)
+ return Knj::Process_meta::Proxy_obj::Buffered_caller.new({
+ :name => @args[:name],
+ :process_meta => @args[:process_meta]
+ }.merge(args))
+ end
+end
+
+class Knj::Process_meta::Proxy_obj::Buffered_caller
+ def initialize(args)
+ @args = args
+ @buffer = []
+ @mutex = Mutex.new
+ @mutex_write = Mutex.new
+ @count = 0
+ @debug = @args[:debug] if @args[:debug]
+
+ if @args[:count_to]
+ @count_to = @args[:count_to]
+ else
+ @count_to = 1000
+ end
+
+ @buffer_max = @count_to * 2
+ @threads = [] if @args[:async]
+ end
+
+ def method_missing(method_name, *args)
+ if method_name.to_s == @args[:method_name].to_s
+ self._pm_call(*args)
+ else
+ raise NoMethodError, "No such method: '#{method_name}'."
+ end
+ end
+
+ def _pm_call(*args)
+ raise @raise_error if @raise_error
+
+ @mutex.synchronize do
+ while @count >= @count_to and @buffer.length >= @buffer_max
+ STDOUT.print "Waiting for write to complete...\n" if @debug
+ sleep 0.1
+ end
+
+ STDOUT.print "Adding to buffer #{@buffer.length}...\n" if @debug
+ @buffer << args
+ @count += 1
+ end
+
+ self._pm_flush if @count >= @count_to and !@writing
+ return nil
+ end
+
+ def _pm_flush(*args)
+ raise @raise_error if @raise_error
+
+ buffer = nil
+ @mutex.synchronize do
+ buffer = @buffer
+ @buffer = []
+ @count = 0
+ end
+
+ if @args[:async]
+ begin
+ @threads << Thread.new do
+ self._pm_flush_real(buffer)
+ end
+ rescue => e
+ @raise_error = e
+ end
+
+ return nil
+ else
+ return self._pm_flush_real(buffer)
+ end
+ end
+
+ def _pm_flush_real(buffer)
+ @mutex_write.synchronize do
+ STDOUT.print "Writing...\n" if @debug
+ @writing = true
+
+ begin
+ return @args[:process_meta].call_object(
+ "var_name" => @args[:name],
+ "method_name" => @args[:method_name],
+ "args" => buffer,
+ "buffered" => true,
+ "capture_return" => false
+ )
+ ensure
+ @writing = false
+ end
+ end
+ end
+
+ def _pm_close
+ self._pm_flush
+
+ if @args[:async]
+ @threads.each do |thread|
+ thread.join
+ end
+ end
+
+ raise @raise_error if @raise_error
end
end
\ No newline at end of file