Sha256: 89381abfc085ef2a84b7f52608ffc596f1ce1020079fbce19bb3cef2d2ae8e9d

Contents?: true

Size: 1.87 KB

Versions: 5

Compression:

Stored size: 1.87 KB

Contents

require "thread"

module Bunny
  # Network activity loop that reads and passes incoming AMQP 0.9.1 methods for
  # processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel.
  # This loop uses a separate thread internally.
  #
  # This mimics the way RabbitMQ Java is designed quite closely.
  class MainLoop

    def initialize(transport, session)
      @transport = transport
      @session   = session
    end


    def start
      @thread    = Thread.new(&method(:run_loop))
    end

    def run_loop
      loop do
        begin
          break if @stopping

          frame = @transport.read_next_frame
          @session.signal_activity!

          next if frame.is_a?(AMQ::Protocol::HeartbeatFrame)

          if !frame.final? || frame.method_class.has_content?
            header   = @transport.read_next_frame
            content  = ''

            if header.body_size > 0
              loop do
                body_frame = @transport.read_next_frame
                content << body_frame.decode_payload

                break if content.bytesize >= header.body_size
              end
            end

            @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content])
          else
            @session.handle_frame(frame.channel, frame.decode_payload)
          end
        rescue Timeout::Error => te
          # given that the server may be pushing data to us, timeout detection/handling
          # should happen per operation and not in this loop
        rescue Errno::EBADF => ebadf
          # ignored, happens when we loop after the transport has already been closed
        rescue Exception => e
          puts e.class.name
          puts e.message
          puts e.backtrace
        end
      end
    end

    def stop
      @stopping = true
    end

    def kill
      @thread.kill
      @thread.join
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
bunny-0.9.0.pre5 lib/bunny/main_loop.rb
bunny-0.9.0.pre4 lib/bunny/main_loop.rb
bunny-0.9.0.pre3 lib/bunny/main_loop.rb
bunny-0.9.0.pre2 lib/bunny/main_loop.rb
bunny-0.9.0.pre1 lib/bunny/main_loop.rb