Sha256: fc3205a89b433cb06e403fa5a8f60b50c757911b597c38879264ebeee6d5745d

Contents?: true

Size: 1.56 KB

Versions: 3

Compression:

Stored size: 1.56 KB

Contents

require 'rbbt/util/concurrency/processes/socket'
class RbbtProcessQueue
  class RbbtProcessQueueWorker
    attr_reader :pid, :queue, :callback_queue, :cleanup, :block
    def initialize(queue, callback_queue = nil, cleanup = nil, &block)
      @queue, @callback_queue, @cleanup, @block = queue, callback_queue, cleanup, block

      @pid = Process.fork do
        begin
          @cleanup.call if @cleanup
          @queue.close_write 

          if @callback_queue
            Misc.purge_pipes(@callback_queue.swrite) 
            @callback_queue.close_read 
          else
            Misc.purge_pipes
          end

          Signal.trap(:INT){ raise Aborted; }
          loop do
            p = @queue.pop
            raise p if Exception === p
            raise p.first if Exception === p.first
            res = @block.call *p
            @callback_queue.push res if @callback_queue
          end

        rescue ClosedStream
        rescue Aborted
          Log.error "Worker #{Process.pid} aborted"
          Kernel.exit! -1
        rescue Exception
          @callback_queue.push($!) if @callback_queue
          Kernel.exit! -1
        ensure
          @callback_queue.close_write if @callback_queue 
        end
      end
    end

    def join
      begin
        joined_pid = Process.waitpid @pid
      rescue
      end
    end

    def abort
      begin
        Process.kill :INT, @pid
      rescue
      end
    end

    def done?
      begin
        Process.waitpid @pid, Process::WNOHANG
      rescue Errno::ECHILD
        true
      rescue
        false
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
rbbt-util-5.11.3 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.11.2 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.11.1 lib/rbbt/util/concurrency/processes/worker.rb