examples/00_manual_test.rb in amq-protocol-0.0.1.pre vs examples/00_manual_test.rb in amq-protocol-0.5.0
- old
+ new
@@ -1,60 +1,139 @@
-# encoding: binary
+#!/usr/bin/env ruby
+# encoding: utf-8
+# Usage:
+# ./examples/00_manual_test.rb
+# ./examples/00_manual_test.rb 5673
+
require "socket"
-require_relative "../lib/amq/protocol.rb"
+require_relative "../lib/amq/protocol/client.rb"
include AMQ::Protocol
-socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
-sockaddr = Socket.pack_sockaddr_in((ARGV.first || 5672).to_i, "127.0.0.1") # NOTE: this doesn't work with "localhost", I don't know why.
+# Stolen from amq-client:
+class AMQ::Protocol::Frame
+ def self.decode(io)
+ header = io.read(7)
+ type, channel, size = self.decode_header(header)
+ data = io.read(size + 1)
+ payload, frame_end = data[0..-2], data[-1]
-begin
- socket.connect(sockaddr)
-rescue Errno::ECONNREFUSED
- abort "Don't forget to start an AMQP broker first!"
+ if frame_end != FINAL_OCTET
+ raise "Frame has to end with #{FINAL_OCTET.inspect}!"
+ end
+
+ self.new(type, payload, channel)
+ end
end
-# helpers
-MethodFrame.encode(:method, 0, Connection::TuneOk.encode(0, 131072, 0))
+# NOTE: this doesn't work with "localhost", I don't know why:
+socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
+sockaddr = Socket.pack_sockaddr_in((ARGV.first || 5672).to_i, "127.0.0.1")
+# helpers
def socket.encode(klass, *args)
STDERR.puts "#{klass}.encode(#{args.inspect[1..-2]})"
- klass.encode(*args).tap do |result|
- STDERR.puts "=> #{result.inspect}"
- STDERR.puts "MethodFrame.encode(:method, 0, #{result.inspect})"
- STDERR.puts "=> #{MethodFrame.encode(:method, 0, result).inspect}\n\n"
- self.write(MethodFrame.encode(:method, 0, result))
+ result = klass.encode(*args)
+ STDERR.puts "=> #{result}"
+ if result.is_a?(Frame)
+ self.write(result.encode)
+ else
+ data = result.inject("") do |buffer, frame|
+ self.write(frame.encode) ###
+ buffer += frame.encode
+ end
+ # self.write(data)
end
end
def socket.decode
- frame = MethodFrame.decode(self)
- STDERR.puts "MethodFrame.decode(#{self.inspect})"
- STDERR.puts "=> #{frame.inspect}\n\n"
+ frame = Frame.decode(self)
+ STDERR.puts "Frame.decode(#{self.inspect})"
+ STDERR.puts "=> #{frame.inspect}"
+ STDERR.puts "frame.decode_payload"
+ STDERR.puts "=> #{res = frame.decode_payload}\n\n"
+ return res
end
-# AMQP preamble
-puts "Sending AMQP preamble (#{AMQ::Protocol::PREAMBLE.inspect})\n\n"
-socket.write AMQ::Protocol::PREAMBLE
+begin
+ socket.connect(sockaddr)
+rescue Errno::ECONNREFUSED
+ abort "Don't forget to start an AMQP broker first!"
+end
-# Start/Start-Ok
-socket.decode
-socket.encode Connection::StartOk, {client: "AMQ Protocol"}, "PLAIN", "guest\0guest\0", "en_GB"
+begin
+ # AMQP preamble
+ puts "Sending AMQP preamble (#{AMQ::Protocol::PREAMBLE.inspect})\n\n"
+ socket.write AMQ::Protocol::PREAMBLE
-# Tune/Tune-Ok
-socket.decode
-socket.encode Connection::TuneOk, 0, 131072, 0
+ # Connection.Start/Connection.Start-Ok
+ connection_start_response = socket.decode
+ socket.encode Connection::StartOk, {client: "AMQ Protocol"}, "PLAIN", "\0guest\0guest", "en_GB"
-# Close
-# socket.encode Connection::Close
-# socket.decode
+ # Connection.Tune/Connection.Tune-Ok
+ connection_tune_response = socket.decode
+ channel_max = connection_tune_response.channel_max
+ frame_max = connection_tune_response.frame_max
+ heartbeat = connection_tune_response.heartbeat
+ socket.encode Connection::TuneOk, channel_max, frame_max, heartbeat
+ puts "Max agreed frame size: #{frame_max}"
-socket.close
+ # Connection.Open/Connection.Open-Ok
+ socket.encode Connection::Open, "/"
+ connection_open_ok_response = socket.decode
-__END__
-[CLIENT] conn#4 ch#0 -> {#method<connection.start-ok>(client-properties={product=AMQP, information=http://github.com/tmm1/amqp, platform=Ruby/EventMachine, version=0.6.7},mechanism=AMQPLAIN,response=LOGINSguesPASSWORDSguest,locale=en_US),null,""}
-[SERVER] conn#4 ch#0 <- {#method<connection.start>(version-major=8,version-minor=0,server properties={product=RabbitMQ, information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, copyright=Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd., version=2.1.0},mechanisms=PLAIN AMQPLAIN,locales=en_US),null,""}
-[SERVER] conn#4 ch#0 <- {#method<connection.tune>(channel-max=0,frame-max=131072,heartbeat=0),null,""}
-[CLIENT] conn#4 ch#0 -> {#method<connection.tune-ok>(channel-max=0,frame-max=131072,heartbeat=0),null,""}
-[CLIENT] conn#4 ch#0 -> {#method<connection.open>(virtual-host=/,capabilities=,insist=false),null,""}
-[SERVER] conn#4 ch#0 <- {#method<connection.open-ok>(known-hosts=),null,""}
+ begin
+ # Channel.Open/Channel.Open-Ok
+ socket.encode Channel::Open, 1, ""
+ channel_open_ok_response = socket.decode
+
+ begin
+ # Exchange.Declare/Exchange.Declare-Ok
+ socket.encode Exchange::Declare, 1, "tasks", "fanout", false, false, false, false, false, {}
+ exchange_declare_ok_response = socket.decode
+
+ # Queue.Declare/Queue.Declare-Ok
+ socket.encode Queue::Declare, 1, "", false, false, false, false, false, {}
+ queue_declare_ok_response = socket.decode
+
+ puts "Queue name: #{queue_declare_ok_response.queue.inspect}"
+
+ # Queue.Bind/Queue.Bind-Ok
+ socket.encode Queue::Bind, 1, queue_declare_ok_response.queue, "tasks", "", false, {}
+ queue_bind_ok_response = socket.decode
+
+ # Basic.Consume
+ socket.encode Basic::Consume, 1, queue_declare_ok_response.queue, "", false, false, false, false, Hash.new
+
+ # Basic.Publish
+ socket.encode Basic::Publish, 1, "this is a payload", {content_type: "text/plain"}, "tasks", "", false, false, frame_max
+
+ # Basic.Consume-Ok
+ basic_consume_ok_response = socket.decode
+ puts "Consumed successfully, consumer tag: #{basic_consume_ok_response.consumer_tag}"
+
+ # Basic.Deliver
+ basic_deliver = socket.decode
+ basic_deliver_header = socket.decode # header frame: {}
+ basic_deliver_body = socket.decode # body frame: "this is a payload"
+ puts "[Received] headers: #{basic_deliver_header.inspect}, payload: #{basic_deliver_body.inspect}"
+ ensure
+ # Channel.Close/Channel.Close-Ok
+ socket.encode Channel::Close, 1, 200, "bye", 0, 0
+ channel_close_ok_response = socket.decode
+ end
+ ensure
+ # Connection.Close/Connection.Close-Ok
+ socket.encode Connection::Close, 200, "", 0, 0
+ close_ok_response = socket.decode
+ end
+rescue Exception => exception
+ STDERR.puts "\n\e[1;31m[#{exception.class}] #{exception.message}\e[0m"
+ exception.backtrace.each do |line|
+ line = "\e[0;36m#{line}\e[0m" if line.match(Regexp::quote(File.basename(__FILE__)))
+ STDERR.puts " - " + line
+ end
+ exit 1 # Yes, this works with the ensure block, even though the rescue block run first. Magic.
+ensure
+ socket.close
+end