examples/00_manual_test.rb in amq-protocol-0.0.1.pre vs examples/00_manual_test.rb in amq-protocol-0.5.0

- old
+ new

@@ -1,60 +1,139 @@ -# encoding: binary +#!/usr/bin/env ruby +# encoding: utf-8 +# Usage: +# ./examples/00_manual_test.rb +# ./examples/00_manual_test.rb 5673 + require "socket" -require_relative "../lib/amq/protocol.rb" +require_relative "../lib/amq/protocol/client.rb" include AMQ::Protocol -socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) -sockaddr = Socket.pack_sockaddr_in((ARGV.first || 5672).to_i, "127.0.0.1") # NOTE: this doesn't work with "localhost", I don't know why. +# Stolen from amq-client: +class AMQ::Protocol::Frame + def self.decode(io) + header = io.read(7) + type, channel, size = self.decode_header(header) + data = io.read(size + 1) + payload, frame_end = data[0..-2], data[-1] -begin - socket.connect(sockaddr) -rescue Errno::ECONNREFUSED - abort "Don't forget to start an AMQP broker first!" + if frame_end != FINAL_OCTET + raise "Frame has to end with #{FINAL_OCTET.inspect}!" + end + + self.new(type, payload, channel) + end end -# helpers -MethodFrame.encode(:method, 0, Connection::TuneOk.encode(0, 131072, 0)) +# NOTE: this doesn't work with "localhost", I don't know why: +socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) +sockaddr = Socket.pack_sockaddr_in((ARGV.first || 5672).to_i, "127.0.0.1") +# helpers def socket.encode(klass, *args) STDERR.puts "#{klass}.encode(#{args.inspect[1..-2]})" - klass.encode(*args).tap do |result| - STDERR.puts "=> #{result.inspect}" - STDERR.puts "MethodFrame.encode(:method, 0, #{result.inspect})" - STDERR.puts "=> #{MethodFrame.encode(:method, 0, result).inspect}\n\n" - self.write(MethodFrame.encode(:method, 0, result)) + result = klass.encode(*args) + STDERR.puts "=> #{result}" + if result.is_a?(Frame) + self.write(result.encode) + else + data = result.inject("") do |buffer, frame| + self.write(frame.encode) ### + buffer += frame.encode + end + # self.write(data) end end def socket.decode - frame = MethodFrame.decode(self) - STDERR.puts "MethodFrame.decode(#{self.inspect})" - STDERR.puts "=> #{frame.inspect}\n\n" + frame = Frame.decode(self) + STDERR.puts "Frame.decode(#{self.inspect})" + STDERR.puts "=> #{frame.inspect}" + STDERR.puts "frame.decode_payload" + STDERR.puts "=> #{res = frame.decode_payload}\n\n" + return res end -# AMQP preamble -puts "Sending AMQP preamble (#{AMQ::Protocol::PREAMBLE.inspect})\n\n" -socket.write AMQ::Protocol::PREAMBLE +begin + socket.connect(sockaddr) +rescue Errno::ECONNREFUSED + abort "Don't forget to start an AMQP broker first!" +end -# Start/Start-Ok -socket.decode -socket.encode Connection::StartOk, {client: "AMQ Protocol"}, "PLAIN", "guest\0guest\0", "en_GB" +begin + # AMQP preamble + puts "Sending AMQP preamble (#{AMQ::Protocol::PREAMBLE.inspect})\n\n" + socket.write AMQ::Protocol::PREAMBLE -# Tune/Tune-Ok -socket.decode -socket.encode Connection::TuneOk, 0, 131072, 0 + # Connection.Start/Connection.Start-Ok + connection_start_response = socket.decode + socket.encode Connection::StartOk, {client: "AMQ Protocol"}, "PLAIN", "\0guest\0guest", "en_GB" -# Close -# socket.encode Connection::Close -# socket.decode + # Connection.Tune/Connection.Tune-Ok + connection_tune_response = socket.decode + channel_max = connection_tune_response.channel_max + frame_max = connection_tune_response.frame_max + heartbeat = connection_tune_response.heartbeat + socket.encode Connection::TuneOk, channel_max, frame_max, heartbeat + puts "Max agreed frame size: #{frame_max}" -socket.close + # Connection.Open/Connection.Open-Ok + socket.encode Connection::Open, "/" + connection_open_ok_response = socket.decode -__END__ -[CLIENT] conn#4 ch#0 -> {#method<connection.start-ok>(client-properties={product=AMQP, information=http://github.com/tmm1/amqp, platform=Ruby/EventMachine, version=0.6.7},mechanism=AMQPLAIN,response=LOGINSguesPASSWORDSguest,locale=en_US),null,""} -[SERVER] conn#4 ch#0 <- {#method<connection.start>(version-major=8,version-minor=0,server properties={product=RabbitMQ, information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, copyright=Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd., version=2.1.0},mechanisms=PLAIN AMQPLAIN,locales=en_US),null,""} -[SERVER] conn#4 ch#0 <- {#method<connection.tune>(channel-max=0,frame-max=131072,heartbeat=0),null,""} -[CLIENT] conn#4 ch#0 -> {#method<connection.tune-ok>(channel-max=0,frame-max=131072,heartbeat=0),null,""} -[CLIENT] conn#4 ch#0 -> {#method<connection.open>(virtual-host=/,capabilities=,insist=false),null,""} -[SERVER] conn#4 ch#0 <- {#method<connection.open-ok>(known-hosts=),null,""} + begin + # Channel.Open/Channel.Open-Ok + socket.encode Channel::Open, 1, "" + channel_open_ok_response = socket.decode + + begin + # Exchange.Declare/Exchange.Declare-Ok + socket.encode Exchange::Declare, 1, "tasks", "fanout", false, false, false, false, false, {} + exchange_declare_ok_response = socket.decode + + # Queue.Declare/Queue.Declare-Ok + socket.encode Queue::Declare, 1, "", false, false, false, false, false, {} + queue_declare_ok_response = socket.decode + + puts "Queue name: #{queue_declare_ok_response.queue.inspect}" + + # Queue.Bind/Queue.Bind-Ok + socket.encode Queue::Bind, 1, queue_declare_ok_response.queue, "tasks", "", false, {} + queue_bind_ok_response = socket.decode + + # Basic.Consume + socket.encode Basic::Consume, 1, queue_declare_ok_response.queue, "", false, false, false, false, Hash.new + + # Basic.Publish + socket.encode Basic::Publish, 1, "this is a payload", {content_type: "text/plain"}, "tasks", "", false, false, frame_max + + # Basic.Consume-Ok + basic_consume_ok_response = socket.decode + puts "Consumed successfully, consumer tag: #{basic_consume_ok_response.consumer_tag}" + + # Basic.Deliver + basic_deliver = socket.decode + basic_deliver_header = socket.decode # header frame: {} + basic_deliver_body = socket.decode # body frame: "this is a payload" + puts "[Received] headers: #{basic_deliver_header.inspect}, payload: #{basic_deliver_body.inspect}" + ensure + # Channel.Close/Channel.Close-Ok + socket.encode Channel::Close, 1, 200, "bye", 0, 0 + channel_close_ok_response = socket.decode + end + ensure + # Connection.Close/Connection.Close-Ok + socket.encode Connection::Close, 200, "", 0, 0 + close_ok_response = socket.decode + end +rescue Exception => exception + STDERR.puts "\n\e[1;31m[#{exception.class}] #{exception.message}\e[0m" + exception.backtrace.each do |line| + line = "\e[0;36m#{line}\e[0m" if line.match(Regexp::quote(File.basename(__FILE__))) + STDERR.puts " - " + line + end + exit 1 # Yes, this works with the ensure block, even though the rescue block run first. Magic. +ensure + socket.close +end