lib/libuv/loop.rb in libuv-0.11.3 vs lib/libuv/loop.rb in libuv-0.11.4

- old
+ new

@@ -1,385 +1,387 @@ -require 'thread' - -module Libuv - class Loop - include Resource, Assertions - - - LOOPS = ThreadSafe::Cache.new - - - module ClassMethods - # Get default loop - # - # @return [::Libuv::Loop] - def default - return current || create(::Libuv::Ext.default_loop) - end - - # Create new Libuv loop - # - # @return [::Libuv::Loop] - def new - return current || create(::Libuv::Ext.loop_new) - end - - # Build a Ruby Libuv loop from an existing loop pointer - # - # @return [::Libuv::Loop] - def create(pointer) - allocate.tap { |i| i.send(:initialize, FFI::AutoPointer.new(pointer, ::Libuv::Ext.method(:loop_delete))) } - end - - # Checks for the existence of a loop on the current thread - # - # @return [::Libuv::Loop | nil] - def current - LOOPS[Thread.current] - end - end - extend ClassMethods - - - # Initialize a loop using an FFI::Pointer to a libuv loop - def initialize(pointer) # :notnew: - @pointer = pointer - @loop = self - - # Create an async call for scheduling work from other threads - @run_queue = Queue.new - @queue_proc = proc do - # ensure we only execute what was required for this tick - length = @run_queue.length - length.times do - begin - run = @run_queue.pop true # pop non-block - run.call - rescue Exception => e - @loop.log :error, :next_tick_cb, e - end - end - end - @process_queue = Async.new(@loop, @queue_proc) - - # Create a next tick timer - @next_tick = @loop.timer do - @next_tick_scheduled = false - @queue_proc.call - end - - # Create an async call for ending the loop - @stop_loop = Async.new @loop do - LOOPS.delete(@reactor_thread) - @reactor_thread = nil - @process_queue.close - @stop_loop.close - @next_tick.close - - ::Libuv::Ext.stop(@pointer) - end - end - - def handle; @pointer; end - - # Run the actual event loop. This method will block until the loop is stopped. - # - # @param run_type [:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT] - # @yieldparam promise [::Libuv::Q::Promise] Yields a promise that can be used for logging unhandled - # exceptions on the loop. - def run(run_type = :UV_RUN_DEFAULT) - if @reactor_thread.nil? - @loop_notify = @loop.defer - - begin - @reactor_thread = Thread.current - LOOPS[@reactor_thread] = @loop - yield @loop_notify.promise if block_given? - ::Libuv::Ext.run(@pointer, run_type) # This is blocking - ensure - @reactor_thread = nil - @run_queue.clear - end - end - @loop - end - - - # Creates a deferred result object for where the result of an operation may only be returned - # at some point in the future or is being processed on a different thread (thread safe) - # - # @return [::Libuv::Q::Deferred] - def defer - Q.defer(@loop) - end - - # Combines multiple promises into a single promise that is resolved when all of the input - # promises are resolved. (thread safe) - # - # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise - # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values, - # each value corresponding to the promise at the same index in the `promises` array. If any of - # the promises is resolved with a rejection, this resulting promise will be resolved with the - # same rejection. - def all(*promises) - Q.all(@loop, *promises) - end - - # - # Combines multiple promises into a single promise that is resolved when any of the input - # promises are resolved. - # - # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise - # @return [::Libuv::Q::Promise] Returns a single promise - def any(*promises) - Q.any(@loop, *promises) - end - - # - # Combines multiple promises into a single promise that is resolved when all of the input - # promises are resolved or rejected. - # - # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise - # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values, - # each [result, wasResolved] value pair corresponding to a at the same index in the `promises` array. - def finally(*promises) - Q.finally(@loop, *promises) - end - - - # forces loop time update, useful for getting more granular times - # - # @return nil - def update_time - ::Libuv::Ext.update_time(@pointer) - end - - # Get current time in milliseconds - # - # @return [Fixnum] - def now - ::Libuv::Ext.now(@pointer) - end - - # Lookup an error code and return is as an error object - # - # @param err [Integer] The error code to look up. - # @return [::Libuv::Error] - def lookup_error(err) - name = ::Libuv::Ext.err_name(err) - msg = ::Libuv::Ext.strerror(err) - - ::Libuv::Error.const_get(name.to_sym).new(msg) - rescue Exception => e - @loop.log :warn, :error_lookup_failed, e - ::Libuv::Error::UNKNOWN.new("error lookup failed for code #{err} #{name} #{msg}") - end - - # Get a new TCP instance - # - # @return [::Libuv::TCP] - def tcp - TCP.new(@loop) - end - - # Get a new UDP instance - # - # @return [::Libuv::UDP] - def udp - UDP.new(@loop) - end - - # Get a new TTY instance - # - # @param fileno [Integer] Integer file descriptor of a tty device - # @param readable [true, false] Boolean indicating if TTY is readable - # @return [::Libuv::TTY] - def tty(fileno, readable = false) - assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given") - - TTY.new(@loop, fileno, readable) - end - - # Get a new Pipe instance - # - # @param ipc [true, false] indicate if a handle will be used for ipc, useful for sharing tcp socket between processes - # @return [::Libuv::Pipe] - def pipe(ipc = false) - Pipe.new(@loop, ipc) - end - - # Get a new timer instance - # - # @param callback [Proc] the callback to be called on timer trigger - # @return [::Libuv::Timer] - def timer(callback = nil, &blk) - Timer.new(@loop, callback || blk) - end - - # Get a new Prepare handle - # - # @return [::Libuv::Prepare] - def prepare(callback = nil, &blk) - Prepare.new(@loop, callback || blk) - end - - # Get a new Check handle - # - # @return [::Libuv::Check] - def check(callback = nil, &blk) - Check.new(@loop, callback || blk) - end - - # Get a new Idle handle - # - # @param callback [Proc] the callback to be called on idle trigger - # @return [::Libuv::Idle] - def idle(callback = nil, &block) - Idle.new(@loop, callback || block) - end - - # Get a new Async handle - # - # @return [::Libuv::Async] - def async(callback = nil, &block) - callback ||= block - handle = Async.new(@loop) - handle.progress callback if callback - handle - end - - # Get a new signal handler - # - # @return [::Libuv::Signal] - def signal(signum = nil, callback = nil, &block) - callback ||= block - handle = Signal.new(@loop) - handle.progress callback if callback - handle.start(signum) if signum - handle - end - - # Queue some work for processing in the libuv thread pool - # - # @param callback [Proc] the callback to be called in the thread pool - # @return [::Libuv::Work] - # @raise [ArgumentError] if block is not given - def work(callback = nil, &block) - callback ||= block - assert_block(callback) - Work.new(@loop, callback) # Work is a promise object - end - - # Lookup a hostname - # - # @param hostname [String] the domain name to lookup - # @param port [Integer, String] the service being connected too - # @param callback [Proc] the callback to be called on success - # @return [::Libuv::Dns] - def lookup(hostname, hint = :IPv4, port = 9, &block) - dns = Dns.new(@loop, hostname, port, hint) # Work is a promise object - dns.then block if block_given? - dns - end - - # Get a new FSEvent instance - # - # @param path [String] the path to the file or folder for watching - # @return [::Libuv::FSEvent] - # @raise [ArgumentError] if path is not a string - def fs_event(path) - assert_type(String, path) - FSEvent.new(@loop, path) - end - - # Opens a file and returns an object that can be used to manipulate it - # - # @param path [String] the path to the file or folder for watching - # @param flags [Integer] see ruby File::Constants - # @param mode [Integer] - # @return [::Libuv::File] - def file(path, flags = 0, mode = 0) - assert_type(String, path, "path must be a String") - assert_type(Integer, flags, "flags must be an Integer") - assert_type(Integer, mode, "mode must be an Integer") - File.new(@loop, path, flags, mode) - end - - # Returns an object for manipulating the filesystem - # - # @return [::Libuv::Filesystem] - def filesystem - Filesystem.new(@loop) - end - - # Schedule some work to be processed on the event loop as soon as possible (thread safe) - # - # @param callback [Proc] the callback to be called on the reactor thread - # @raise [ArgumentError] if block is not given - def schedule(callback = nil, &block) - callback ||= block - assert_block(callback) - - if reactor_thread? - block.call - else - @run_queue << callback - @process_queue.call - end - end - - # Queue some work to be processed in the next iteration of the event loop (thread safe) - # - # @param callback [Proc] the callback to be called on the reactor thread - # @raise [ArgumentError] if block is not given - def next_tick(callback = nil, &block) - callback ||= block - assert_block(callback) - - @run_queue << callback - if reactor_thread? - # Create a next tick timer - if not @next_tick_scheduled - @next_tick.start(0) - @next_tick_scheduled = true - end - else - @process_queue.call - end - end - - # Notifies the loop there was an event that should be logged - # - # @param level [Symbol] the error level (info, warn, error etc) - # @param id [Object] some kind of identifying information - # @param *args [*args] any additional information - def log(level, id, *args) - @loop_notify.notify(level, id, *args) - end - - # Closes handles opened by the loop class and completes the current loop iteration (thread safe) - def stop - @stop_loop.call - end - - # True if the calling thread is the same thread as the reactor. - # - # @return [Boolean] - def reactor_thread? - @reactor_thread == Thread.current - end - - # Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor). - # - # @return [Thread] - def reactor_thread - @reactor_thread - end - - # Tells you whether the Libuv reactor loop is currently running. - # - # @return [Boolean] - def reactor_running? - !@reactor_thread.nil? - end - end -end +require 'thread' + +module Libuv + class Loop + include Resource, Assertions + + + LOOPS = ThreadSafe::Cache.new + + + module ClassMethods + # Get default loop + # + # @return [::Libuv::Loop] + def default + return @default ||= create(::Libuv::Ext.default_loop) + end + + # Create new Libuv loop + # + # @return [::Libuv::Loop] + def new + return create(::Libuv::Ext.loop_new) + end + + # Build a Ruby Libuv loop from an existing loop pointer + # + # @return [::Libuv::Loop] + def create(pointer) + allocate.tap { |i| i.send(:initialize, FFI::AutoPointer.new(pointer, ::Libuv::Ext.method(:loop_delete))) } + end + + # Checks for the existence of a loop on the current thread + # + # @return [::Libuv::Loop | nil] + def current + LOOPS[Thread.current] + end + end + extend ClassMethods + + + # Initialize a loop using an FFI::Pointer to a libuv loop + def initialize(pointer) # :notnew: + @pointer = pointer + @loop = self + + # Create an async call for scheduling work from other threads + @run_queue = Queue.new + @queue_proc = proc do + # ensure we only execute what was required for this tick + length = @run_queue.length + length.times do + begin + run = @run_queue.pop true # pop non-block + run.call + rescue Exception => e + @loop.log :error, :next_tick_cb, e + end + end + end + @process_queue = Async.new(@loop, @queue_proc) + + # Create a next tick timer + @next_tick = @loop.timer do + @next_tick_scheduled = false + @queue_proc.call + end + + # Create an async call for ending the loop + @stop_loop = Async.new @loop do + LOOPS.delete(@reactor_thread) + @reactor_thread = nil + @process_queue.close + @stop_loop.close + @next_tick.close + + ::Libuv::Ext.stop(@pointer) + end + end + + def handle; @pointer; end + + # Run the actual event loop. This method will block until the loop is stopped. + # + # @param run_type [:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT] + # @yieldparam promise [::Libuv::Q::Promise] Yields a promise that can be used for logging unhandled + # exceptions on the loop. + def run(run_type = :UV_RUN_DEFAULT) + if @reactor_thread.nil? + @loop_notify = @loop.defer + + begin + @reactor_thread = Thread.current + LOOPS[@reactor_thread] = @loop + yield @loop_notify.promise if block_given? + ::Libuv::Ext.run(@pointer, run_type) # This is blocking + ensure + @reactor_thread = nil + @run_queue.clear + end + elsif block_given? + schedule { yield @loop_notify.promise } + end + @loop + end + + + # Creates a deferred result object for where the result of an operation may only be returned + # at some point in the future or is being processed on a different thread (thread safe) + # + # @return [::Libuv::Q::Deferred] + def defer + Q.defer(@loop) + end + + # Combines multiple promises into a single promise that is resolved when all of the input + # promises are resolved. (thread safe) + # + # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise + # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values, + # each value corresponding to the promise at the same index in the `promises` array. If any of + # the promises is resolved with a rejection, this resulting promise will be resolved with the + # same rejection. + def all(*promises) + Q.all(@loop, *promises) + end + + # + # Combines multiple promises into a single promise that is resolved when any of the input + # promises are resolved. + # + # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise + # @return [::Libuv::Q::Promise] Returns a single promise + def any(*promises) + Q.any(@loop, *promises) + end + + # + # Combines multiple promises into a single promise that is resolved when all of the input + # promises are resolved or rejected. + # + # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise + # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values, + # each [result, wasResolved] value pair corresponding to a at the same index in the `promises` array. + def finally(*promises) + Q.finally(@loop, *promises) + end + + + # forces loop time update, useful for getting more granular times + # + # @return nil + def update_time + ::Libuv::Ext.update_time(@pointer) + end + + # Get current time in milliseconds + # + # @return [Fixnum] + def now + ::Libuv::Ext.now(@pointer) + end + + # Lookup an error code and return is as an error object + # + # @param err [Integer] The error code to look up. + # @return [::Libuv::Error] + def lookup_error(err) + name = ::Libuv::Ext.err_name(err) + msg = ::Libuv::Ext.strerror(err) + + ::Libuv::Error.const_get(name.to_sym).new(msg) + rescue Exception => e + @loop.log :warn, :error_lookup_failed, e + ::Libuv::Error::UNKNOWN.new("error lookup failed for code #{err} #{name} #{msg}") + end + + # Get a new TCP instance + # + # @return [::Libuv::TCP] + def tcp + TCP.new(@loop) + end + + # Get a new UDP instance + # + # @return [::Libuv::UDP] + def udp + UDP.new(@loop) + end + + # Get a new TTY instance + # + # @param fileno [Integer] Integer file descriptor of a tty device + # @param readable [true, false] Boolean indicating if TTY is readable + # @return [::Libuv::TTY] + def tty(fileno, readable = false) + assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given") + + TTY.new(@loop, fileno, readable) + end + + # Get a new Pipe instance + # + # @param ipc [true, false] indicate if a handle will be used for ipc, useful for sharing tcp socket between processes + # @return [::Libuv::Pipe] + def pipe(ipc = false) + Pipe.new(@loop, ipc) + end + + # Get a new timer instance + # + # @param callback [Proc] the callback to be called on timer trigger + # @return [::Libuv::Timer] + def timer(callback = nil, &blk) + Timer.new(@loop, callback || blk) + end + + # Get a new Prepare handle + # + # @return [::Libuv::Prepare] + def prepare(callback = nil, &blk) + Prepare.new(@loop, callback || blk) + end + + # Get a new Check handle + # + # @return [::Libuv::Check] + def check(callback = nil, &blk) + Check.new(@loop, callback || blk) + end + + # Get a new Idle handle + # + # @param callback [Proc] the callback to be called on idle trigger + # @return [::Libuv::Idle] + def idle(callback = nil, &block) + Idle.new(@loop, callback || block) + end + + # Get a new Async handle + # + # @return [::Libuv::Async] + def async(callback = nil, &block) + callback ||= block + handle = Async.new(@loop) + handle.progress callback if callback + handle + end + + # Get a new signal handler + # + # @return [::Libuv::Signal] + def signal(signum = nil, callback = nil, &block) + callback ||= block + handle = Signal.new(@loop) + handle.progress callback if callback + handle.start(signum) if signum + handle + end + + # Queue some work for processing in the libuv thread pool + # + # @param callback [Proc] the callback to be called in the thread pool + # @return [::Libuv::Work] + # @raise [ArgumentError] if block is not given + def work(callback = nil, &block) + callback ||= block + assert_block(callback) + Work.new(@loop, callback) # Work is a promise object + end + + # Lookup a hostname + # + # @param hostname [String] the domain name to lookup + # @param port [Integer, String] the service being connected too + # @param callback [Proc] the callback to be called on success + # @return [::Libuv::Dns] + def lookup(hostname, hint = :IPv4, port = 9, &block) + dns = Dns.new(@loop, hostname, port, hint) # Work is a promise object + dns.then block if block_given? + dns + end + + # Get a new FSEvent instance + # + # @param path [String] the path to the file or folder for watching + # @return [::Libuv::FSEvent] + # @raise [ArgumentError] if path is not a string + def fs_event(path) + assert_type(String, path) + FSEvent.new(@loop, path) + end + + # Opens a file and returns an object that can be used to manipulate it + # + # @param path [String] the path to the file or folder for watching + # @param flags [Integer] see ruby File::Constants + # @param mode [Integer] + # @return [::Libuv::File] + def file(path, flags = 0, mode = 0) + assert_type(String, path, "path must be a String") + assert_type(Integer, flags, "flags must be an Integer") + assert_type(Integer, mode, "mode must be an Integer") + File.new(@loop, path, flags, mode) + end + + # Returns an object for manipulating the filesystem + # + # @return [::Libuv::Filesystem] + def filesystem + Filesystem.new(@loop) + end + + # Schedule some work to be processed on the event loop as soon as possible (thread safe) + # + # @param callback [Proc] the callback to be called on the reactor thread + # @raise [ArgumentError] if block is not given + def schedule(callback = nil, &block) + callback ||= block + assert_block(callback) + + if reactor_thread? + block.call + else + @run_queue << callback + @process_queue.call + end + end + + # Queue some work to be processed in the next iteration of the event loop (thread safe) + # + # @param callback [Proc] the callback to be called on the reactor thread + # @raise [ArgumentError] if block is not given + def next_tick(callback = nil, &block) + callback ||= block + assert_block(callback) + + @run_queue << callback + if reactor_thread? + # Create a next tick timer + if not @next_tick_scheduled + @next_tick.start(0) + @next_tick_scheduled = true + end + else + @process_queue.call + end + end + + # Notifies the loop there was an event that should be logged + # + # @param level [Symbol] the error level (info, warn, error etc) + # @param id [Object] some kind of identifying information + # @param *args [*args] any additional information + def log(level, id, *args) + @loop_notify.notify(level, id, *args) + end + + # Closes handles opened by the loop class and completes the current loop iteration (thread safe) + def stop + @stop_loop.call + end + + # True if the calling thread is the same thread as the reactor. + # + # @return [Boolean] + def reactor_thread? + @reactor_thread == Thread.current + end + + # Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor). + # + # @return [Thread] + def reactor_thread + @reactor_thread + end + + # Tells you whether the Libuv reactor loop is currently running. + # + # @return [Boolean] + def reactor_running? + !@reactor_thread.nil? + end + end +end