Sha256: a83354d7a4d2f8e0f5db9a33e6cbeb688b57bd2b48e71bdd345432184c72d4c4

Contents?: true

Size: 1.15 KB

Versions: 11

Compression:

Stored size: 1.15 KB

Contents

require 'rbbt/util/concurrency/processes/socket'
class RbbtProcessQueue
  class RbbtProcessQueueWorker
    attr_accessor :pid, :queue, :callback_queue, :block
    def initialize(queue, callback_queue = nil, &block)
      @queue, @callback_queue, @block = queue, callback_queue, block

      @pid = Process.fork do
        begin
          @queue.swrite.close
          @callback_queue.sread.close if @callback_queue

          Signal.trap(:INT){ raise Aborted; }
          loop do
            p = @queue.pop
            raise p if Exception === p
            res = @block.call p
            @callback_queue.push res if @callback_queue
          end

          exit 0
        rescue ClosedStream
          exit 0
        rescue Aborted
          exit -1
        rescue Exception
          Log.exception $!
          @callback_queue.push($!) if @callback_queue
          exit -1
        end

      end
    end

    def join
      Process.waitpid @pid
    end

    def abort
      Process.kill :INT, @pid
    end

    def done?
      begin
        Process.waitpid @pid, Process::WNOHANG
      rescue Errno::ECHILD
        true
      rescue
        false
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
rbbt-util-5.9.0 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.10 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.9 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.8 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.7 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.6 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.4 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.3 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.2 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.1 lib/rbbt/util/concurrency/processes/worker.rb
rbbt-util-5.8.0 lib/rbbt/util/concurrency/processes/worker.rb