Sha256: d913532ecb7172ad148ad206d234b51aa85ae884b92a63c4cd954685471d8b2e

Contents?: true

Size: 1.93 KB

Versions: 5

Compression:

Stored size: 1.93 KB

Contents

require 'rosetta_queue/consumer_managers/base'

module RosettaQueue
  class ThreadedManager < BaseManager

    def initialize
      @threads    = {}
      @running    = true
      super
    end

    def start
      start_threads
      join_threads
      monitor_threads
    end

    def stop
      stop_threads
    end

  private

    def join_threads
      @threads.each { |thread| thread.join }
    end

    def monitor_threads
      while @running
        trap("TERM", "EXIT")
        living = false
        @threads.each { |name, thread| living ||= thread.alive? }
        @running = living
        sleep 1
      end

      puts "All connection threads have died..."
      rescue Interrupt=>e
        RosettaQueue.logger.warn("Interrupt received.  Shutting down...")
        puts "\nInterrupt received\n"
      rescue Exception=>e
        RosettaQueue.logger.error("Exception thrown -- #{e.class.name}: #{e.message}")
      ensure
        RosettaQueue.logger.warn("Cleaning up threads...")
        stop_threads
    end

    def start_threads
      @consumers.each do |key, consumer|
        @threads[key] = Thread.new(key, consumer) do |a_key, a_consumer|
          while @running
            begin
              RosettaQueue.logger.info("Threading consumer #{a_consumer}...")
              Mutex.new.synchronize { a_consumer.receive }
            rescue StopProcessingException=>e
              RosettaQueue.logger.error("#{a_key}: Processing Stopped - receive interrupted")
            rescue Exception=>e
              RosettaQueue.logger.error("#{a_key}: Exception from connection.receive: #{$!}\n" + e.backtrace.join("\n\t"))
            end
            Thread.pass
          end
        end
      end      
    end

    def stop_threads
      @running = false
      @threads.each do |key, thread|
        RosettaQueue.logger.info("Stopping thread and disconnecting from #{key}...")
        @consumers[key].disconnect
        thread.kill
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
cwyckoff-rosetta_queue-0.1.4 lib/rosetta_queue/consumer_managers/threaded.rb
cwyckoff-rosetta_queue-0.2.0 lib/rosetta_queue/consumer_managers/threaded.rb
cwyckoff-rosetta_queue-0.2.1 lib/rosetta_queue/consumer_managers/threaded.rb
cwyckoff-rosetta_queue-0.2.2 lib/rosetta_queue/consumer_managers/threaded.rb
cwyckoff-rosetta_queue-0.3.0 lib/rosetta_queue/consumer_managers/threaded.rb