lib/libuv/loop.rb in libuv-0.11.3 vs lib/libuv/loop.rb in libuv-0.11.4
- old
+ new
@@ -1,385 +1,387 @@
-require 'thread'
-
-module Libuv
- class Loop
- include Resource, Assertions
-
-
- LOOPS = ThreadSafe::Cache.new
-
-
- module ClassMethods
- # Get default loop
- #
- # @return [::Libuv::Loop]
- def default
- return current || create(::Libuv::Ext.default_loop)
- end
-
- # Create new Libuv loop
- #
- # @return [::Libuv::Loop]
- def new
- return current || create(::Libuv::Ext.loop_new)
- end
-
- # Build a Ruby Libuv loop from an existing loop pointer
- #
- # @return [::Libuv::Loop]
- def create(pointer)
- allocate.tap { |i| i.send(:initialize, FFI::AutoPointer.new(pointer, ::Libuv::Ext.method(:loop_delete))) }
- end
-
- # Checks for the existence of a loop on the current thread
- #
- # @return [::Libuv::Loop | nil]
- def current
- LOOPS[Thread.current]
- end
- end
- extend ClassMethods
-
-
- # Initialize a loop using an FFI::Pointer to a libuv loop
- def initialize(pointer) # :notnew:
- @pointer = pointer
- @loop = self
-
- # Create an async call for scheduling work from other threads
- @run_queue = Queue.new
- @queue_proc = proc do
- # ensure we only execute what was required for this tick
- length = @run_queue.length
- length.times do
- begin
- run = @run_queue.pop true # pop non-block
- run.call
- rescue Exception => e
- @loop.log :error, :next_tick_cb, e
- end
- end
- end
- @process_queue = Async.new(@loop, @queue_proc)
-
- # Create a next tick timer
- @next_tick = @loop.timer do
- @next_tick_scheduled = false
- @queue_proc.call
- end
-
- # Create an async call for ending the loop
- @stop_loop = Async.new @loop do
- LOOPS.delete(@reactor_thread)
- @reactor_thread = nil
- @process_queue.close
- @stop_loop.close
- @next_tick.close
-
- ::Libuv::Ext.stop(@pointer)
- end
- end
-
- def handle; @pointer; end
-
- # Run the actual event loop. This method will block until the loop is stopped.
- #
- # @param run_type [:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT]
- # @yieldparam promise [::Libuv::Q::Promise] Yields a promise that can be used for logging unhandled
- # exceptions on the loop.
- def run(run_type = :UV_RUN_DEFAULT)
- if @reactor_thread.nil?
- @loop_notify = @loop.defer
-
- begin
- @reactor_thread = Thread.current
- LOOPS[@reactor_thread] = @loop
- yield @loop_notify.promise if block_given?
- ::Libuv::Ext.run(@pointer, run_type) # This is blocking
- ensure
- @reactor_thread = nil
- @run_queue.clear
- end
- end
- @loop
- end
-
-
- # Creates a deferred result object for where the result of an operation may only be returned
- # at some point in the future or is being processed on a different thread (thread safe)
- #
- # @return [::Libuv::Q::Deferred]
- def defer
- Q.defer(@loop)
- end
-
- # Combines multiple promises into a single promise that is resolved when all of the input
- # promises are resolved. (thread safe)
- #
- # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise
- # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values,
- # each value corresponding to the promise at the same index in the `promises` array. If any of
- # the promises is resolved with a rejection, this resulting promise will be resolved with the
- # same rejection.
- def all(*promises)
- Q.all(@loop, *promises)
- end
-
- #
- # Combines multiple promises into a single promise that is resolved when any of the input
- # promises are resolved.
- #
- # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise
- # @return [::Libuv::Q::Promise] Returns a single promise
- def any(*promises)
- Q.any(@loop, *promises)
- end
-
- #
- # Combines multiple promises into a single promise that is resolved when all of the input
- # promises are resolved or rejected.
- #
- # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise
- # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values,
- # each [result, wasResolved] value pair corresponding to a at the same index in the `promises` array.
- def finally(*promises)
- Q.finally(@loop, *promises)
- end
-
-
- # forces loop time update, useful for getting more granular times
- #
- # @return nil
- def update_time
- ::Libuv::Ext.update_time(@pointer)
- end
-
- # Get current time in milliseconds
- #
- # @return [Fixnum]
- def now
- ::Libuv::Ext.now(@pointer)
- end
-
- # Lookup an error code and return is as an error object
- #
- # @param err [Integer] The error code to look up.
- # @return [::Libuv::Error]
- def lookup_error(err)
- name = ::Libuv::Ext.err_name(err)
- msg = ::Libuv::Ext.strerror(err)
-
- ::Libuv::Error.const_get(name.to_sym).new(msg)
- rescue Exception => e
- @loop.log :warn, :error_lookup_failed, e
- ::Libuv::Error::UNKNOWN.new("error lookup failed for code #{err} #{name} #{msg}")
- end
-
- # Get a new TCP instance
- #
- # @return [::Libuv::TCP]
- def tcp
- TCP.new(@loop)
- end
-
- # Get a new UDP instance
- #
- # @return [::Libuv::UDP]
- def udp
- UDP.new(@loop)
- end
-
- # Get a new TTY instance
- #
- # @param fileno [Integer] Integer file descriptor of a tty device
- # @param readable [true, false] Boolean indicating if TTY is readable
- # @return [::Libuv::TTY]
- def tty(fileno, readable = false)
- assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")
-
- TTY.new(@loop, fileno, readable)
- end
-
- # Get a new Pipe instance
- #
- # @param ipc [true, false] indicate if a handle will be used for ipc, useful for sharing tcp socket between processes
- # @return [::Libuv::Pipe]
- def pipe(ipc = false)
- Pipe.new(@loop, ipc)
- end
-
- # Get a new timer instance
- #
- # @param callback [Proc] the callback to be called on timer trigger
- # @return [::Libuv::Timer]
- def timer(callback = nil, &blk)
- Timer.new(@loop, callback || blk)
- end
-
- # Get a new Prepare handle
- #
- # @return [::Libuv::Prepare]
- def prepare(callback = nil, &blk)
- Prepare.new(@loop, callback || blk)
- end
-
- # Get a new Check handle
- #
- # @return [::Libuv::Check]
- def check(callback = nil, &blk)
- Check.new(@loop, callback || blk)
- end
-
- # Get a new Idle handle
- #
- # @param callback [Proc] the callback to be called on idle trigger
- # @return [::Libuv::Idle]
- def idle(callback = nil, &block)
- Idle.new(@loop, callback || block)
- end
-
- # Get a new Async handle
- #
- # @return [::Libuv::Async]
- def async(callback = nil, &block)
- callback ||= block
- handle = Async.new(@loop)
- handle.progress callback if callback
- handle
- end
-
- # Get a new signal handler
- #
- # @return [::Libuv::Signal]
- def signal(signum = nil, callback = nil, &block)
- callback ||= block
- handle = Signal.new(@loop)
- handle.progress callback if callback
- handle.start(signum) if signum
- handle
- end
-
- # Queue some work for processing in the libuv thread pool
- #
- # @param callback [Proc] the callback to be called in the thread pool
- # @return [::Libuv::Work]
- # @raise [ArgumentError] if block is not given
- def work(callback = nil, &block)
- callback ||= block
- assert_block(callback)
- Work.new(@loop, callback) # Work is a promise object
- end
-
- # Lookup a hostname
- #
- # @param hostname [String] the domain name to lookup
- # @param port [Integer, String] the service being connected too
- # @param callback [Proc] the callback to be called on success
- # @return [::Libuv::Dns]
- def lookup(hostname, hint = :IPv4, port = 9, &block)
- dns = Dns.new(@loop, hostname, port, hint) # Work is a promise object
- dns.then block if block_given?
- dns
- end
-
- # Get a new FSEvent instance
- #
- # @param path [String] the path to the file or folder for watching
- # @return [::Libuv::FSEvent]
- # @raise [ArgumentError] if path is not a string
- def fs_event(path)
- assert_type(String, path)
- FSEvent.new(@loop, path)
- end
-
- # Opens a file and returns an object that can be used to manipulate it
- #
- # @param path [String] the path to the file or folder for watching
- # @param flags [Integer] see ruby File::Constants
- # @param mode [Integer]
- # @return [::Libuv::File]
- def file(path, flags = 0, mode = 0)
- assert_type(String, path, "path must be a String")
- assert_type(Integer, flags, "flags must be an Integer")
- assert_type(Integer, mode, "mode must be an Integer")
- File.new(@loop, path, flags, mode)
- end
-
- # Returns an object for manipulating the filesystem
- #
- # @return [::Libuv::Filesystem]
- def filesystem
- Filesystem.new(@loop)
- end
-
- # Schedule some work to be processed on the event loop as soon as possible (thread safe)
- #
- # @param callback [Proc] the callback to be called on the reactor thread
- # @raise [ArgumentError] if block is not given
- def schedule(callback = nil, &block)
- callback ||= block
- assert_block(callback)
-
- if reactor_thread?
- block.call
- else
- @run_queue << callback
- @process_queue.call
- end
- end
-
- # Queue some work to be processed in the next iteration of the event loop (thread safe)
- #
- # @param callback [Proc] the callback to be called on the reactor thread
- # @raise [ArgumentError] if block is not given
- def next_tick(callback = nil, &block)
- callback ||= block
- assert_block(callback)
-
- @run_queue << callback
- if reactor_thread?
- # Create a next tick timer
- if not @next_tick_scheduled
- @next_tick.start(0)
- @next_tick_scheduled = true
- end
- else
- @process_queue.call
- end
- end
-
- # Notifies the loop there was an event that should be logged
- #
- # @param level [Symbol] the error level (info, warn, error etc)
- # @param id [Object] some kind of identifying information
- # @param *args [*args] any additional information
- def log(level, id, *args)
- @loop_notify.notify(level, id, *args)
- end
-
- # Closes handles opened by the loop class and completes the current loop iteration (thread safe)
- def stop
- @stop_loop.call
- end
-
- # True if the calling thread is the same thread as the reactor.
- #
- # @return [Boolean]
- def reactor_thread?
- @reactor_thread == Thread.current
- end
-
- # Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor).
- #
- # @return [Thread]
- def reactor_thread
- @reactor_thread
- end
-
- # Tells you whether the Libuv reactor loop is currently running.
- #
- # @return [Boolean]
- def reactor_running?
- !@reactor_thread.nil?
- end
- end
-end
+require 'thread'
+
+module Libuv
+ class Loop
+ include Resource, Assertions
+
+
+ LOOPS = ThreadSafe::Cache.new
+
+
+ module ClassMethods
+ # Get default loop
+ #
+ # @return [::Libuv::Loop]
+ def default
+ return @default ||= create(::Libuv::Ext.default_loop)
+ end
+
+ # Create new Libuv loop
+ #
+ # @return [::Libuv::Loop]
+ def new
+ return create(::Libuv::Ext.loop_new)
+ end
+
+ # Build a Ruby Libuv loop from an existing loop pointer
+ #
+ # @return [::Libuv::Loop]
+ def create(pointer)
+ allocate.tap { |i| i.send(:initialize, FFI::AutoPointer.new(pointer, ::Libuv::Ext.method(:loop_delete))) }
+ end
+
+ # Checks for the existence of a loop on the current thread
+ #
+ # @return [::Libuv::Loop | nil]
+ def current
+ LOOPS[Thread.current]
+ end
+ end
+ extend ClassMethods
+
+
+ # Initialize a loop using an FFI::Pointer to a libuv loop
+ def initialize(pointer) # :notnew:
+ @pointer = pointer
+ @loop = self
+
+ # Create an async call for scheduling work from other threads
+ @run_queue = Queue.new
+ @queue_proc = proc do
+ # ensure we only execute what was required for this tick
+ length = @run_queue.length
+ length.times do
+ begin
+ run = @run_queue.pop true # pop non-block
+ run.call
+ rescue Exception => e
+ @loop.log :error, :next_tick_cb, e
+ end
+ end
+ end
+ @process_queue = Async.new(@loop, @queue_proc)
+
+ # Create a next tick timer
+ @next_tick = @loop.timer do
+ @next_tick_scheduled = false
+ @queue_proc.call
+ end
+
+ # Create an async call for ending the loop
+ @stop_loop = Async.new @loop do
+ LOOPS.delete(@reactor_thread)
+ @reactor_thread = nil
+ @process_queue.close
+ @stop_loop.close
+ @next_tick.close
+
+ ::Libuv::Ext.stop(@pointer)
+ end
+ end
+
+ def handle; @pointer; end
+
+ # Run the actual event loop. This method will block until the loop is stopped.
+ #
+ # @param run_type [:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT]
+ # @yieldparam promise [::Libuv::Q::Promise] Yields a promise that can be used for logging unhandled
+ # exceptions on the loop.
+ def run(run_type = :UV_RUN_DEFAULT)
+ if @reactor_thread.nil?
+ @loop_notify = @loop.defer
+
+ begin
+ @reactor_thread = Thread.current
+ LOOPS[@reactor_thread] = @loop
+ yield @loop_notify.promise if block_given?
+ ::Libuv::Ext.run(@pointer, run_type) # This is blocking
+ ensure
+ @reactor_thread = nil
+ @run_queue.clear
+ end
+ elsif block_given?
+ schedule { yield @loop_notify.promise }
+ end
+ @loop
+ end
+
+
+ # Creates a deferred result object for where the result of an operation may only be returned
+ # at some point in the future or is being processed on a different thread (thread safe)
+ #
+ # @return [::Libuv::Q::Deferred]
+ def defer
+ Q.defer(@loop)
+ end
+
+ # Combines multiple promises into a single promise that is resolved when all of the input
+ # promises are resolved. (thread safe)
+ #
+ # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise
+ # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values,
+ # each value corresponding to the promise at the same index in the `promises` array. If any of
+ # the promises is resolved with a rejection, this resulting promise will be resolved with the
+ # same rejection.
+ def all(*promises)
+ Q.all(@loop, *promises)
+ end
+
+ #
+ # Combines multiple promises into a single promise that is resolved when any of the input
+ # promises are resolved.
+ #
+ # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise
+ # @return [::Libuv::Q::Promise] Returns a single promise
+ def any(*promises)
+ Q.any(@loop, *promises)
+ end
+
+ #
+ # Combines multiple promises into a single promise that is resolved when all of the input
+ # promises are resolved or rejected.
+ #
+ # @param *promises [::Libuv::Q::Promise] a number of promises that will be combined into a single promise
+ # @return [::Libuv::Q::Promise] Returns a single promise that will be resolved with an array of values,
+ # each [result, wasResolved] value pair corresponding to a at the same index in the `promises` array.
+ def finally(*promises)
+ Q.finally(@loop, *promises)
+ end
+
+
+ # forces loop time update, useful for getting more granular times
+ #
+ # @return nil
+ def update_time
+ ::Libuv::Ext.update_time(@pointer)
+ end
+
+ # Get current time in milliseconds
+ #
+ # @return [Fixnum]
+ def now
+ ::Libuv::Ext.now(@pointer)
+ end
+
+ # Lookup an error code and return is as an error object
+ #
+ # @param err [Integer] The error code to look up.
+ # @return [::Libuv::Error]
+ def lookup_error(err)
+ name = ::Libuv::Ext.err_name(err)
+ msg = ::Libuv::Ext.strerror(err)
+
+ ::Libuv::Error.const_get(name.to_sym).new(msg)
+ rescue Exception => e
+ @loop.log :warn, :error_lookup_failed, e
+ ::Libuv::Error::UNKNOWN.new("error lookup failed for code #{err} #{name} #{msg}")
+ end
+
+ # Get a new TCP instance
+ #
+ # @return [::Libuv::TCP]
+ def tcp
+ TCP.new(@loop)
+ end
+
+ # Get a new UDP instance
+ #
+ # @return [::Libuv::UDP]
+ def udp
+ UDP.new(@loop)
+ end
+
+ # Get a new TTY instance
+ #
+ # @param fileno [Integer] Integer file descriptor of a tty device
+ # @param readable [true, false] Boolean indicating if TTY is readable
+ # @return [::Libuv::TTY]
+ def tty(fileno, readable = false)
+ assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")
+
+ TTY.new(@loop, fileno, readable)
+ end
+
+ # Get a new Pipe instance
+ #
+ # @param ipc [true, false] indicate if a handle will be used for ipc, useful for sharing tcp socket between processes
+ # @return [::Libuv::Pipe]
+ def pipe(ipc = false)
+ Pipe.new(@loop, ipc)
+ end
+
+ # Get a new timer instance
+ #
+ # @param callback [Proc] the callback to be called on timer trigger
+ # @return [::Libuv::Timer]
+ def timer(callback = nil, &blk)
+ Timer.new(@loop, callback || blk)
+ end
+
+ # Get a new Prepare handle
+ #
+ # @return [::Libuv::Prepare]
+ def prepare(callback = nil, &blk)
+ Prepare.new(@loop, callback || blk)
+ end
+
+ # Get a new Check handle
+ #
+ # @return [::Libuv::Check]
+ def check(callback = nil, &blk)
+ Check.new(@loop, callback || blk)
+ end
+
+ # Get a new Idle handle
+ #
+ # @param callback [Proc] the callback to be called on idle trigger
+ # @return [::Libuv::Idle]
+ def idle(callback = nil, &block)
+ Idle.new(@loop, callback || block)
+ end
+
+ # Get a new Async handle
+ #
+ # @return [::Libuv::Async]
+ def async(callback = nil, &block)
+ callback ||= block
+ handle = Async.new(@loop)
+ handle.progress callback if callback
+ handle
+ end
+
+ # Get a new signal handler
+ #
+ # @return [::Libuv::Signal]
+ def signal(signum = nil, callback = nil, &block)
+ callback ||= block
+ handle = Signal.new(@loop)
+ handle.progress callback if callback
+ handle.start(signum) if signum
+ handle
+ end
+
+ # Queue some work for processing in the libuv thread pool
+ #
+ # @param callback [Proc] the callback to be called in the thread pool
+ # @return [::Libuv::Work]
+ # @raise [ArgumentError] if block is not given
+ def work(callback = nil, &block)
+ callback ||= block
+ assert_block(callback)
+ Work.new(@loop, callback) # Work is a promise object
+ end
+
+ # Lookup a hostname
+ #
+ # @param hostname [String] the domain name to lookup
+ # @param port [Integer, String] the service being connected too
+ # @param callback [Proc] the callback to be called on success
+ # @return [::Libuv::Dns]
+ def lookup(hostname, hint = :IPv4, port = 9, &block)
+ dns = Dns.new(@loop, hostname, port, hint) # Work is a promise object
+ dns.then block if block_given?
+ dns
+ end
+
+ # Get a new FSEvent instance
+ #
+ # @param path [String] the path to the file or folder for watching
+ # @return [::Libuv::FSEvent]
+ # @raise [ArgumentError] if path is not a string
+ def fs_event(path)
+ assert_type(String, path)
+ FSEvent.new(@loop, path)
+ end
+
+ # Opens a file and returns an object that can be used to manipulate it
+ #
+ # @param path [String] the path to the file or folder for watching
+ # @param flags [Integer] see ruby File::Constants
+ # @param mode [Integer]
+ # @return [::Libuv::File]
+ def file(path, flags = 0, mode = 0)
+ assert_type(String, path, "path must be a String")
+ assert_type(Integer, flags, "flags must be an Integer")
+ assert_type(Integer, mode, "mode must be an Integer")
+ File.new(@loop, path, flags, mode)
+ end
+
+ # Returns an object for manipulating the filesystem
+ #
+ # @return [::Libuv::Filesystem]
+ def filesystem
+ Filesystem.new(@loop)
+ end
+
+ # Schedule some work to be processed on the event loop as soon as possible (thread safe)
+ #
+ # @param callback [Proc] the callback to be called on the reactor thread
+ # @raise [ArgumentError] if block is not given
+ def schedule(callback = nil, &block)
+ callback ||= block
+ assert_block(callback)
+
+ if reactor_thread?
+ block.call
+ else
+ @run_queue << callback
+ @process_queue.call
+ end
+ end
+
+ # Queue some work to be processed in the next iteration of the event loop (thread safe)
+ #
+ # @param callback [Proc] the callback to be called on the reactor thread
+ # @raise [ArgumentError] if block is not given
+ def next_tick(callback = nil, &block)
+ callback ||= block
+ assert_block(callback)
+
+ @run_queue << callback
+ if reactor_thread?
+ # Create a next tick timer
+ if not @next_tick_scheduled
+ @next_tick.start(0)
+ @next_tick_scheduled = true
+ end
+ else
+ @process_queue.call
+ end
+ end
+
+ # Notifies the loop there was an event that should be logged
+ #
+ # @param level [Symbol] the error level (info, warn, error etc)
+ # @param id [Object] some kind of identifying information
+ # @param *args [*args] any additional information
+ def log(level, id, *args)
+ @loop_notify.notify(level, id, *args)
+ end
+
+ # Closes handles opened by the loop class and completes the current loop iteration (thread safe)
+ def stop
+ @stop_loop.call
+ end
+
+ # True if the calling thread is the same thread as the reactor.
+ #
+ # @return [Boolean]
+ def reactor_thread?
+ @reactor_thread == Thread.current
+ end
+
+ # Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor).
+ #
+ # @return [Thread]
+ def reactor_thread
+ @reactor_thread
+ end
+
+ # Tells you whether the Libuv reactor loop is currently running.
+ #
+ # @return [Boolean]
+ def reactor_running?
+ !@reactor_thread.nil?
+ end
+ end
+end