lib/amqp/client/frames.rb in amqp-client-0.1.0 vs lib/amqp/client/frames.rb in amqp-client-0.2.0
- old
+ new
@@ -1,27 +1,31 @@
# frozen_string_literal: true
+require_relative "./properties"
+require_relative "./table"
+
module AMQP
# Generate binary data for different frames
# Each frame type implemented as a method
# Having a class for each frame type is more expensive in terms of CPU and memory
module FrameBytes
module_function
- def connection_start_ok(response)
+ def connection_start_ok(response, properties)
+ prop_tbl = Table.encode(properties)
[
1, # type: method
0, # channel id
- 4 + 4 + 6 + 4 + response.bytesize + 1, # frame size
+ 2 + 2 + 4 + prop_tbl.bytesize + 6 + 4 + response.bytesize + 1, # frame size
10, # class id
11, # method id
- 0, # client props
+ prop_tbl.bytesize, prop_tbl, # client props
5, "PLAIN", # mechanism
response.bytesize, response,
0, "", # locale
206 # frame end
- ].pack("C S> L> S> S> L> Ca* L>a* Ca* C")
+ ].pack("C S> L> S> S> L>a* Ca* L>a* Ca* C")
end
def connection_tune_ok(channel_max, frame_max, heartbeat)
[
1, # type: method
@@ -103,84 +107,425 @@
0, # error method id
206 # frame end
].pack("C S> L> S> S> S> Ca* S> S> C")
end
- def queue_declare(id, name, passive, durable, exclusive, auto_delete)
+ def channel_close_ok(id)
+ [
+ 1, # type: method
+ id, # channel id
+ 4, # frame size
+ 20, # class: channel
+ 41, # method: close-ok
+ 206 # frame end
+ ].pack("C S> L> S> S> C")
+ end
+
+ def exchange_declare(id, name, type, passive, durable, auto_delete, internal, arguments)
no_wait = false
bits = 0
bits |= (1 << 0) if passive
bits |= (1 << 1) if durable
+ bits |= (1 << 2) if auto_delete
+ bits |= (1 << 3) if internal
+ bits |= (1 << 4) if no_wait
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + name.bytesize + 1 + type.bytesize + 1 + 4 + tbl.bytesize
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 40, # class: exchange
+ 10, # method: declare
+ 0, # reserved1
+ name.bytesize, name,
+ type.bytesize, type,
+ bits,
+ tbl.bytesize, tbl, # arguments
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* Ca* C L>a* C")
+ end
+
+ def exchange_delete(id, name, if_unused, no_wait)
+ bits = 0
+ bits |= (1 << 0) if if_unused
+ bits |= (1 << 1) if no_wait
+ frame_size = 2 + 2 + 2 + 1 + name.bytesize + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 40, # class: exchange
+ 20, # method: delete
+ 0, # reserved1
+ name.bytesize, name,
+ bits,
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* C C")
+ end
+
+ def exchange_bind(id, destination, source, binding_key, no_wait, arguments)
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + destination.bytesize + 1 + source.bytesize + 1 +
+ binding_key.bytesize + 1 + 4 + tbl.bytesize
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 40, # class: exchange
+ 30, # method: bind
+ 0, # reserved1
+ destination.bytesize, destination,
+ source.bytesize, source,
+ binding_key.bytesize, binding_key,
+ no_wait ? 1 : 0,
+ tbl.bytesize, tbl, # arguments
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* Ca* Ca* C L>a* C")
+ end
+
+ def exchange_unbind(id, destination, source, binding_key, no_wait, arguments)
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + destination.bytesize + 1 + source.bytesize + 1 +
+ binding_key.bytesize + 1 + 4 + tbl.bytesize
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 40, # class: exchange
+ 40, # method: unbind
+ 0, # reserved1
+ destination.bytesize, destination,
+ source.bytesize, source,
+ binding_key.bytesize, binding_key,
+ no_wait ? 1 : 0,
+ tbl.bytesize, tbl, # arguments
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* Ca* Ca* C L>a* C")
+ end
+
+ def queue_declare(id, name, passive, durable, exclusive, auto_delete, arguments)
+ no_wait = false
+ bits = 0
+ bits |= (1 << 0) if passive
+ bits |= (1 << 1) if durable
bits |= (1 << 2) if exclusive
bits |= (1 << 3) if auto_delete
bits |= (1 << 4) if no_wait
- frame_size = 2 + 2 + 2 + 1 + name.bytesize + 1 + 4
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + name.bytesize + 1 + 4 + tbl.bytesize
[
1, # type: method
id, # channel id
frame_size, # frame size
50, # class: queue
10, # method: declare
0, # reserved1
name.bytesize, name,
bits,
- 0, # arguments
+ tbl.bytesize, tbl, # arguments
206 # frame end
- ].pack("C S> L> S> S> S> Ca* C L> C")
+ ].pack("C S> L> S> S> S> Ca* C L>a* C")
end
- def basic_get(id, queue_name, no_ack)
- frame_size = 2 + 2 + 2 + 1 + queue_name.bytesize + 2 + 2
+ def queue_delete(id, name, if_unused, if_empty, no_wait)
+ bits = 0
+ bits |= (1 << 0) if if_unused
+ bits |= (1 << 1) if if_empty
+ bits |= (1 << 2) if no_wait
+ frame_size = 2 + 2 + 2 + 1 + name.bytesize + 1
[
1, # type: method
id, # channel id
frame_size, # frame size
+ 50, # class: queue
+ 40, # method: declare
+ 0, # reserved1
+ name.bytesize, name,
+ bits,
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* C C")
+ end
+
+ def queue_bind(id, queue, exchange, binding_key, no_wait, arguments)
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + queue.bytesize + 1 + exchange.bytesize + 1 +
+ binding_key.bytesize + 1 + 4 + tbl.bytesize
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 50, # class: queue
+ 20, # method: bind
+ 0, # reserved1
+ queue.bytesize, queue,
+ exchange.bytesize, exchange,
+ binding_key.bytesize, binding_key,
+ no_wait ? 1 : 0,
+ tbl.bytesize, tbl, # arguments
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* Ca* Ca* C L>a* C")
+ end
+
+ def queue_unbind(id, queue, exchange, binding_key, arguments)
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + queue.bytesize + 1 + exchange.bytesize + 1 + binding_key.bytesize + 4 + tbl.bytesize
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 50, # class: queue
+ 50, # method: unbind
+ 0, # reserved1
+ queue.bytesize, queue,
+ exchange.bytesize, exchange,
+ binding_key.bytesize, binding_key,
+ tbl.bytesize, tbl, # arguments
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* Ca* Ca* L>a* C")
+ end
+
+ def queue_purge(id, queue, no_wait)
+ frame_size = 2 + 2 + 2 + 1 + queue.bytesize + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 50, # class: queue
+ 30, # method: purge
+ 0, # reserved1
+ queue.bytesize, queue,
+ no_wait ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* C C")
+ end
+
+ def basic_get(id, queue, no_ack)
+ frame_size = 2 + 2 + 2 + 1 + queue.bytesize + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
60, # class: basic
70, # method: get
0, # reserved1
- queue_name.bytesize, queue_name,
+ queue.bytesize, queue,
no_ack ? 1 : 0,
206 # frame end
].pack("C S> L> S> S> S> Ca* C C")
end
- def basic_publish(id, exchange, routing_key)
+ def basic_publish(id, exchange, routing_key, mandatory)
frame_size = 2 + 2 + 2 + 1 + exchange.bytesize + 1 + routing_key.bytesize + 1
[
1, # type: method
id, # channel id
frame_size, # frame size
60, # class: basic
40, # method: publish
0, # reserved1
exchange.bytesize, exchange,
routing_key.bytesize, routing_key,
- 0, # bits, mandatory/immediate
+ mandatory ? 1 : 0, # bits, mandatory/immediate
206 # frame end
].pack("C S> L> S> S> S> Ca* Ca* C C")
end
def header(id, body_size, properties)
- frame_size = 2 + 2 + 8 + 2
+ props = Properties.new(**properties).encode
+ frame_size = 2 + 2 + 8 + props.bytesize
[
2, # type: header
id, # channel id
frame_size, # frame size
60, # class: basic
0, # weight
body_size,
- 0, # properties
+ props, # properties
206 # frame end
- ].pack("C S> L> S> S> Q> S> C")
+ ].pack("C S> L> S> S> Q> a* C")
end
def body(id, body_part)
[
3, # type: body
id, # channel id
body_part.bytesize, # frame size
body_part,
206 # frame end
].pack("C S> L> a* C")
+ end
+
+ def basic_consume(id, queue, tag, no_ack, exclusive, arguments)
+ no_local = false
+ no_wait = false
+ bits = 0
+ bits |= (1 << 0) if no_local
+ bits |= (1 << 1) if no_ack
+ bits |= (1 << 2) if exclusive
+ bits |= (1 << 3) if no_wait
+ tbl = Table.encode(arguments)
+ frame_size = 2 + 2 + 2 + 1 + queue.bytesize + 1 + tag.bytesize + 1 + 4 + tbl.bytesize
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 20, # method: consume
+ 0, # reserved1
+ queue.bytesize, queue,
+ tag.bytesize, tag,
+ bits, # bits
+ tbl.bytesize, tbl, # arguments
+ 206 # frame end
+ ].pack("C S> L> S> S> S> Ca* Ca* C L>a* C")
+ end
+
+ def basic_cancel(id, consumer_tag, no_wait: false)
+ frame_size = 2 + 2 + 1 + consumer_tag.bytesize + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 30, # method: cancel
+ consumer_tag.bytesize, consumer_tag,
+ no_wait ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> Ca* C C")
+ end
+
+ def basic_cancel_ok(id, consumer_tag)
+ frame_size = 2 + 2 + 1 + consumer_tag.bytesize + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 31, # method: cancel-ok
+ consumer_tag.bytesize, consumer_tag,
+ 206 # frame end
+ ].pack("C S> L> S> S> Ca* C")
+ end
+
+ def basic_ack(id, delivery_tag, multiple)
+ frame_size = 2 + 2 + 8 + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 80, # method: ack
+ delivery_tag,
+ multiple ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> Q> C C")
+ end
+
+ def basic_nack(id, delivery_tag, multiple, requeue)
+ bits = 0
+ bits |= (1 << 0) if multiple
+ bits |= (1 << 1) if requeue
+ frame_size = 2 + 2 + 8 + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 120, # method: nack
+ delivery_tag,
+ bits,
+ 206 # frame end
+ ].pack("C S> L> S> S> Q> C C")
+ end
+
+ def basic_reject(id, delivery_tag, requeue)
+ frame_size = 2 + 2 + 8 + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 90, # method: reject
+ delivery_tag,
+ requeue ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> Q> C C")
+ end
+
+ def basic_qos(id, prefetch_size, prefetch_count, global)
+ frame_size = 2 + 2 + 4 + 2 + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 10, # method: qos
+ prefetch_size,
+ prefetch_count,
+ global ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> L> S> C C")
+ end
+
+ def basic_recover(id, requeue)
+ frame_size = 2 + 2 + 1
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 60, # class: basic
+ 110, # method: recover
+ requeue ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> C C")
+ end
+
+ def confirm_select(id, no_wait)
+ [
+ 1, # type: method
+ id, # channel id
+ 5, # frame size
+ 85, # class: confirm
+ 10, # method: select
+ no_wait ? 1 : 0,
+ 206 # frame end
+ ].pack("C S> L> S> S> C C")
+ end
+
+ def tx_select(id)
+ frame_size = 2 + 2
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 90, # class: tx
+ 10, # method: select
+ 206 # frame end
+ ].pack("C S> L> S> S> C")
+ end
+
+ def tx_commit(id)
+ frame_size = 2 + 2
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 90, # class: tx
+ 20, # method: commit
+ 206 # frame end
+ ].pack("C S> L> S> S> C")
+ end
+
+ def tx_rollback(id)
+ frame_size = 2 + 2
+ [
+ 1, # type: method
+ id, # channel id
+ frame_size, # frame size
+ 90, # class: tx
+ 30, # method: rollback
+ 206 # frame end
+ ].pack("C S> L> S> S> C")
end
end
end