lib/tapyrus/network/message_handler.rb in tapyrus-0.2.7 vs lib/tapyrus/network/message_handler.rb in tapyrus-0.2.8
- old
+ new
@@ -1,11 +1,9 @@
module Tapyrus
module Network
-
# P2P message handler used by peer connection class.
module MessageHandler
-
# handle p2p message.
def handle(message)
peer.last_recv = Time.now.to_i
peer.bytes_recv += message.bytesize
begin
@@ -20,11 +18,11 @@
@message += message
command, payload, rest = parse_header
return unless command
defer_handle_command(command, payload)
- @message = ""
+ @message = ''
parse(rest) if rest && rest.bytesize > 0
end
def parse_header
head_magic = Tapyrus.chain_params.magic_head
@@ -33,74 +31,77 @@
magic, command, length, checksum = @message.unpack('a4A12Va4')
raise Tapyrus::Message::Error, "invalid header magic. #{magic.bth}" unless magic.bth == head_magic
payload = @message[MESSAGE_HEADER_SIZE...(MESSAGE_HEADER_SIZE + length)]
return if payload.size < length
- raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}" unless Tapyrus.double_sha256(payload)[0...4] == checksum
+ unless Tapyrus.double_sha256(payload)[0...4] == checksum
+ raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}"
+ end
rest = @message[(MESSAGE_HEADER_SIZE + length)..-1]
[command, payload, rest]
end
# handle command with EM#defer
def defer_handle_command(command, payload)
- operation = proc {handle_command(command, payload)}
- callback = proc{|result|}
- errback = proc{|e|
- logger.error("error occurred. #{e.message}")
- logger.error(e.backtrace)
- peer.handle_error(e)
- }
+ operation = proc { handle_command(command, payload) }
+ callback = proc { |result| }
+ errback =
+ proc do |e|
+ logger.error("error occurred. #{e.message}")
+ logger.error(e.backtrace)
+ peer.handle_error(e)
+ end
EM.defer(operation, callback, errback)
end
def handle_command(command, payload)
logger.info("[#{addr}] process command #{command}.")
case command
- when Tapyrus::Message::Version::COMMAND
- on_version(Tapyrus::Message::Version.parse_from_payload(payload))
- when Tapyrus::Message::VerAck::COMMAND
- on_ver_ack
- when Tapyrus::Message::GetAddr::COMMAND
- on_get_addr
- when Tapyrus::Message::Addr::COMMAND
- on_addr(Tapyrus::Message::Addr.parse_from_payload(payload))
- when Tapyrus::Message::SendHeaders::COMMAND
- on_send_headers
- when Tapyrus::Message::FeeFilter::COMMAND
- on_fee_filter(Tapyrus::Message::FeeFilter.parse_from_payload(payload))
- when Tapyrus::Message::Ping::COMMAND
- on_ping(Tapyrus::Message::Ping.parse_from_payload(payload))
- when Tapyrus::Message::Pong::COMMAND
- on_pong(Tapyrus::Message::Pong.parse_from_payload(payload))
- when Tapyrus::Message::GetHeaders::COMMAND
- on_get_headers(Tapyrus::Message::GetHeaders.parse_from_payload(payload))
- when Tapyrus::Message::Headers::COMMAND
- on_headers(Tapyrus::Message::Headers.parse_from_payload(payload))
- when Tapyrus::Message::Block::COMMAND
- on_block(Tapyrus::Message::Block.parse_from_payload(payload))
- when Tapyrus::Message::Tx::COMMAND
- on_tx(Tapyrus::Message::Tx.parse_from_payload(payload))
- when Tapyrus::Message::NotFound::COMMAND
- on_not_found(Tapyrus::Message::NotFound.parse_from_payload(payload))
- when Tapyrus::Message::MemPool::COMMAND
- on_mem_pool
- when Tapyrus::Message::Reject::COMMAND
- on_reject(Tapyrus::Message::Reject.parse_from_payload(payload))
- when Tapyrus::Message::SendCmpct::COMMAND
- on_send_cmpct(Tapyrus::Message::SendCmpct.parse_from_payload(payload))
- when Tapyrus::Message::Inv::COMMAND
- on_inv(Tapyrus::Message::Inv.parse_from_payload(payload))
- when Tapyrus::Message::MerkleBlock::COMMAND
- on_merkle_block(Tapyrus::Message::MerkleBlock.parse_from_payload(payload))
- when Tapyrus::Message::CmpctBlock::COMMAND
- on_cmpct_block(Tapyrus::Message::CmpctBlock.parse_from_payload(payload))
- when Tapyrus::Message::GetData::COMMAND
- on_get_data(Tapyrus::Message::GetData.parse_from_payload(payload))
- else
- logger.warn("unsupported command received. command: #{command}, payload: #{payload.bth}")
- close("with command #{command}")
+ when Tapyrus::Message::Version::COMMAND
+ on_version(Tapyrus::Message::Version.parse_from_payload(payload))
+ when Tapyrus::Message::VerAck::COMMAND
+ on_ver_ack
+ when Tapyrus::Message::GetAddr::COMMAND
+ on_get_addr
+ when Tapyrus::Message::Addr::COMMAND
+ on_addr(Tapyrus::Message::Addr.parse_from_payload(payload))
+ when Tapyrus::Message::SendHeaders::COMMAND
+ on_send_headers
+ when Tapyrus::Message::FeeFilter::COMMAND
+ on_fee_filter(Tapyrus::Message::FeeFilter.parse_from_payload(payload))
+ when Tapyrus::Message::Ping::COMMAND
+ on_ping(Tapyrus::Message::Ping.parse_from_payload(payload))
+ when Tapyrus::Message::Pong::COMMAND
+ on_pong(Tapyrus::Message::Pong.parse_from_payload(payload))
+ when Tapyrus::Message::GetHeaders::COMMAND
+ on_get_headers(Tapyrus::Message::GetHeaders.parse_from_payload(payload))
+ when Tapyrus::Message::Headers::COMMAND
+ on_headers(Tapyrus::Message::Headers.parse_from_payload(payload))
+ when Tapyrus::Message::Block::COMMAND
+ on_block(Tapyrus::Message::Block.parse_from_payload(payload))
+ when Tapyrus::Message::Tx::COMMAND
+ on_tx(Tapyrus::Message::Tx.parse_from_payload(payload))
+ when Tapyrus::Message::NotFound::COMMAND
+ on_not_found(Tapyrus::Message::NotFound.parse_from_payload(payload))
+ when Tapyrus::Message::MemPool::COMMAND
+ on_mem_pool
+ when Tapyrus::Message::Reject::COMMAND
+ on_reject(Tapyrus::Message::Reject.parse_from_payload(payload))
+ when Tapyrus::Message::SendCmpct::COMMAND
+ on_send_cmpct(Tapyrus::Message::SendCmpct.parse_from_payload(payload))
+ when Tapyrus::Message::Inv::COMMAND
+ on_inv(Tapyrus::Message::Inv.parse_from_payload(payload))
+ when Tapyrus::Message::MerkleBlock::COMMAND
+ on_merkle_block(Tapyrus::Message::MerkleBlock.parse_from_payload(payload))
+ when Tapyrus::Message::CmpctBlock::COMMAND
+ on_cmpct_block(Tapyrus::Message::CmpctBlock.parse_from_payload(payload))
+ when Tapyrus::Message::GetData::COMMAND
+ on_get_data(Tapyrus::Message::GetData.parse_from_payload(payload))
+ else
+ logger.warn("unsupported command received. command: #{command}, payload: #{payload.bth}")
+ close("with command #{command}")
end
end
def send_message(msg)
logger.info "send message #{msg.class::COMMAND}"
@@ -210,15 +211,15 @@
logger.info('receive inv message.')
blocks = []
txs = []
inv.inventories.each do |i|
case i.identifier
- when Tapyrus::Message::Inventory::MSG_TX
- txs << i.hash
- when Tapyrus::Message::Inventory::MSG_BLOCK
- blocks << i.hash
- else
- logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}")
+ when Tapyrus::Message::Inventory::MSG_TX
+ txs << i.hash
+ when Tapyrus::Message::Inventory::MSG_BLOCK
+ blocks << i.hash
+ else
+ logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}")
end
end
logger.info("receive block= #{blocks.size}, txs: #{txs.size}")
peer.handle_block_inv(blocks) unless blocks.empty?
end