Sha256: 84d5abeb6f99c921b4eb94a3ad28994c01d1a8ece52bd64b4f8c69b52af584f1

Contents?: true

Size: 1.69 KB

Versions: 18

Compression:

Stored size: 1.69 KB

Contents

require 'rbbt/util/concurrency/processes/socket'
class RbbtProcessQueue
  class RbbtProcessQueueWorker
    attr_reader :pid, :queue, :callback_queue, :cleanup, :block
    def initialize(queue, callback_queue = nil, cleanup = nil, &block)
      @queue, @callback_queue, @cleanup, @block = queue, callback_queue, cleanup, block

      @pid = Process.fork do
        begin
          Misc.pre_fork

          @cleanup.call if @cleanup
          @queue.close_write 

          if @callback_queue
            Misc.purge_pipes(@callback_queue.swrite) 
            @callback_queue.close_read 
          else
            Misc.purge_pipes
          end

          Signal.trap(:INT){ 
            Kernel.exit! -1
          }

          loop do
            p = @queue.pop
            next if p.nil?
            raise p if Exception === p
            raise p.first if Exception === p.first
            res = @block.call *p
            @callback_queue.push res if @callback_queue
          end
          Kernel.exit! 0
        rescue ClosedStream
        rescue Aborted, Interrupt
          Log.warn "Worker #{Process.pid} aborted"
          Kernel.exit! 0
        rescue Exception
          Log.exception $!
          @callback_queue.push($!) if @callback_queue
          Kernel.exit! -1
        ensure
          @callback_queue.close_write if @callback_queue 
        end
      end
    end

    def join
      Process.waitpid @pid
      raise ProcessFailed unless $?.success?
    end

    def abort
      begin
        Process.kill :INT, @pid
      rescue
      end
    end

    def done?
      begin
        Process.waitpid @pid, Process::WNOHANG
      rescue Errno::ECHILD
        true
      rescue
        false
      end
    end
  end
end

Version data entries

18 entries across 18 versions & 1 rubygems

Version Path
rbbt-util-5.14.18 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.17 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.16 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.15 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.14 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.12 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.11 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.10 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.9 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.8 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.7 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.6 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.5 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.4 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.3 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.2 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.1 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.14.0 lib/rbbt/util/concurrency/processes/worker.rb