require 'enumerator' require 'rubygems' require 'eventmachine' require 'amqp_spec' module AMQP class BufferOverflow < StandardError; end class InvalidFrame < StandardError; end module Protocol class Class::Method def initialize *args opts = args.pop if args.size == 1 and args.last.is_a? Hash opts ||= {} # XXX hack, p(obj) == '' if no instance vars are set @debug = 1 if args.size == 1 and args.first.is_a? Buffer buf = args.first else buf = nil end self.class.arguments.each do |type, name| if buf val = buf.parse(type) else val = args.shift || opts[name] || opts[name.to_s] end instance_variable_set("@#{name}", val) end end def convert_bits if @bits.size > 0 data = @bits.to_enum(:each_slice, 8).inject(''){|octet, list| octet + pack(:octet, list.enum_with_index.inject(0){ |byte, (bit, i)| byte |= 1<> 32 pack(:long, upper) + pack(:long, lower) when :bit data = if data.nil? or data == false or data == 0 0 else 1 end pack(:octet, data) end end end def self.parse payload buf = Buffer.new(payload) class_id, method_id = buf.parse(:short, :short) classes[class_id].methods[method_id].new(buf) end end class Frame def initialize type, channel, payload @channel = channel @type = (1..8).include?(type) ? TYPES[type] : TYPES.include?(type) ? type : raise(InvalidFrame) if @type == :METHOD and payload.is_a? String @payload = Protocol.parse(payload) else @payload = payload end end attr_reader :type, :channel, :payload def to_binary data = payload.to_s size = data.length # XXX the spec falsely claims there is another empty 'cycle' octet in this header [TYPES.index(type), channel, size, data, FOOTER].pack("CnNa*C") end alias :to_s :to_binary def == b [ :type, :channel, :payload ].inject(true){|eql, field| eql and __send__(field) == b.__send__(field) } end def self.extract data (@buffer ||= Buffer.new).extract(data.to_s) end end class Buffer def initialize data = '' @data = data @pos = 0 end attr_reader :pos def extract data = nil @data << data if data processed = 0 frames = [] while true type, channel, size = parse(:octet, :short, :long) payload = read(size) if read(1) == Frame::FOOTER.chr frames << Frame.new(type, channel, payload) else raise InvalidFrame end processed = @pos end rescue BufferOverflow # log 'buffer overflow', @pos, processed @data[0..processed] = '' if processed > 0 @pos = 0 frames end def parse *syms res = syms.map do |sym| # log 'parsing', sym case sym when :octet read(1, 'C') when :short read(2, 'n') when :long read(4, 'N') when :longlong # FIXME when :shortstr len = parse(:octet) read(len) when :longstr len = parse(:long) read(len) when :timestamp parse(:longlong) when :bit # FIXME when :table t = Hash.new table = Buffer.new(parse(:longstr)) until table.eof? key, type = table.parse(:shortstr, :octet) t[key] = case type when ?S table.parse(:longstr) when ?I table.parse(:long) when ?D d = table.parse(:octet) table.parse(:long) / (10**d) when ?T table.parse(:timestamp) when ?F table.parse(:table) end end t else # FIXME remove end end syms.length == 1 ? res[0] : res end def read len, type = nil # log 'reading', len, type, :pos => @pos raise BufferOverflow if @pos+len > @data.length d = @data.slice(@pos, len) @pos += len d = d.unpack(type).pop if type # log 'read', d d end def eof? @pos == @data.length end private def log *args p args end end module Connection def initialize blk @blk = blk end def connection_completed log 'connected' send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('CCCC') end def receive_data data # log 'receive_data', data Frame.extract(data).each do |frame| log 'receive', frame case method = frame.payload when Protocol::Connection::Start send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine', :product => 'AMQP', :information => 'http://github.com/tmm1/amqp', :version => '0.0.1'}, 'AMQPLAIN', {:LOGIN => 'guest', :PASSWORD => 'guest'}, 'en_US') when Protocol::Connection::Tune send Protocol::Connection::TuneOk.new :channel_max => 0, :frame_max => 131072, :heartbeat => 0 send Protocol::Connection::Open.new :virtual_host => '/', :capabilities => '', :insist => false when Protocol::Connection::OpenOk send Protocol::Channel::Open.new, :channel => 1 when Protocol::Channel::OpenOk send Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true), :channel => 1 when Protocol::Access::RequestOk @ticket = method.ticket send Protocol::Queue::Declare.new(:ticket => @ticket, :queue => '', :exclusive => false, :auto_delete => true), :channel => 1 when Protocol::Queue::DeclareOk @queue = method.queue send Protocol::Queue::Bind.new(:ticket => @ticket, :queue => @queue, :exchange => '', :routing_key => 'test_route'), :channel => 1 when Protocol::Queue::BindOk send Protocol::Basic::Consume.new(:ticket => @ticket, :queue => @queue, :no_local => false, :no_ack => true), :channel => 1 when Protocol::Basic::ConsumeOk send Protocol::Basic::Publish.new(:ticket => @ticket, :exchange => '', :routing_key => 'test_route'), :channel => 1, :content => 'this is a test!' end end end def send data, opts = {} channel = opts[:channel] ||= 0 data = data.to_frame(channel) unless data.is_a? Frame log 'send', data send_data data.to_binary if content = opts[:content] size = content.length lower = size & 0xffffffff upper = (size & ~0xffffffff) >> 32 # XXX rabbitmq only supports weight == 0 data = [ 60, weight = 0, upper, lower, 0b1001_1000_0000_0000 ].pack("nnNNn") data += [ len = 'application/octet-stream'.length, 'application/octet-stream', 1, 1 ].pack("Ca*CC") send_data Frame.new(:HEADER, channel, data).to_binary # XXX spec says add FRAME_END, but rabbitmq doesn't support it # data = [ content, Frame::FOOTER ].pack('a*C') data = [ content ].pack("a*") send_data Frame.new(:BODY, channel, data).to_binary end end def send_data data log 'send_data', data super end def unbind log 'disconnected' end def self.start host = 'localhost', port = PORT, &blk EM.run{ EM.connect host, port, self, blk } end private def log *args pp args puts end end def self.start *args, &blk Connection.start *args, &blk end end require 'pp' if $0 == __FILE__ EM.run{ AMQP.start do |amqp| amqp.test.integer { |meth| meth }.send(0) amqp(1).on(:method) { } amqp(1).test.integer end } elsif $0 =~ /bacon/ include AMQP describe Frame do should 'convert to binary' do Frame.new(2, 0, 'abc').to_binary.should == "\002\000\000\000\000\000\003abc\316" end should 'return type as symbol' do Frame.new(3, 0, 'abc').type.should == :BODY Frame.new(:BODY, 0, 'abc').type.should == :BODY end should 'wrap Buffer#extract' do Frame.extract(frame = Frame.new(2,0,'abc')).first.should == frame end end describe Buffer do @frame = Frame.new(2, 0, 'abc') should 'parse complete frames' do frame = Buffer.new(@frame.to_binary).extract.first frame.should.be.kind_of? Frame frame.should.be == @frame end should 'not return incomplete frames until complete' do buffer = Buffer.new(@frame.to_binary[0..5]) buffer.extract.should == [] buffer.extract(@frame.to_binary[6..-1]).should == [@frame] buffer.extract.should == [] end end describe Protocol do @start = Protocol::Connection::Start.new(:locales => 'en_US', :mechanisms => 'PLAIN AMQPLAIN', :version_major => 8, :version_minor => 0, :server_properties => {'product' => 'RabbitMQ'}) @startok = Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine', :product => 'AMQP', :information => 'http://github.com/tmm1/amqp', :version => '0.0.1'}, 'PLAIN', {:LOGIN => 'guest', :PASSWORD => 'guest'}, 'en_US') should 'generate method packets' do meth = Protocol::Connection::StartOk.new :locale => 'en_US', :mechanism => 'PLAIN' meth.locale.should == @startok.locale end should 'generate method frames' do @startok.to_frame.should == Frame.new(:METHOD, 0, @startok) end should 'convert to and from binary' do Protocol.parse(@start.to_binary).should == @start end should 'convert to and from frames' do # XXX make this Frame.parse (refactor Buffer#extract) Buffer.new(@start.to_frame.to_binary).extract.first.payload.should == @start end end end __END__ ["connected"] ["receive", #"Erlang/OTP", "product"=>"RabbitMQ", "information"=>"Licensed under the MPL. See http://www.rabbitmq.com/", "version"=>"%%VERSION%%", "copyright"=> "Copyright (C) 2007-2008 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."}, @version_major=8, @version_minor=0>, @type=:METHOD>] ["send", #"AMQP", :information=>"http://github.com/tmm1/amqp", :platform=>"Ruby/EventMachine", :version=>"0.0.1"}, @debug=1, @locale="en_US", @mechanism="AMQPLAIN", @response={:PASSWORD=>"guest", :LOGIN=>"guest"}>, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["send", #, @type=:METHOD>] ["receive", #, @type=:METHOD>] ["receive", #] ["receive", #]