lib/tapyrus/network/message_handler.rb in tapyrus-0.2.7 vs lib/tapyrus/network/message_handler.rb in tapyrus-0.2.8

- old
+ new

@@ -1,11 +1,9 @@ module Tapyrus module Network - # P2P message handler used by peer connection class. module MessageHandler - # handle p2p message. def handle(message) peer.last_recv = Time.now.to_i peer.bytes_recv += message.bytesize begin @@ -20,11 +18,11 @@ @message += message command, payload, rest = parse_header return unless command defer_handle_command(command, payload) - @message = "" + @message = '' parse(rest) if rest && rest.bytesize > 0 end def parse_header head_magic = Tapyrus.chain_params.magic_head @@ -33,74 +31,77 @@ magic, command, length, checksum = @message.unpack('a4A12Va4') raise Tapyrus::Message::Error, "invalid header magic. #{magic.bth}" unless magic.bth == head_magic payload = @message[MESSAGE_HEADER_SIZE...(MESSAGE_HEADER_SIZE + length)] return if payload.size < length - raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}" unless Tapyrus.double_sha256(payload)[0...4] == checksum + unless Tapyrus.double_sha256(payload)[0...4] == checksum + raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}" + end rest = @message[(MESSAGE_HEADER_SIZE + length)..-1] [command, payload, rest] end # handle command with EM#defer def defer_handle_command(command, payload) - operation = proc {handle_command(command, payload)} - callback = proc{|result|} - errback = proc{|e| - logger.error("error occurred. #{e.message}") - logger.error(e.backtrace) - peer.handle_error(e) - } + operation = proc { handle_command(command, payload) } + callback = proc { |result| } + errback = + proc do |e| + logger.error("error occurred. #{e.message}") + logger.error(e.backtrace) + peer.handle_error(e) + end EM.defer(operation, callback, errback) end def handle_command(command, payload) logger.info("[#{addr}] process command #{command}.") case command - when Tapyrus::Message::Version::COMMAND - on_version(Tapyrus::Message::Version.parse_from_payload(payload)) - when Tapyrus::Message::VerAck::COMMAND - on_ver_ack - when Tapyrus::Message::GetAddr::COMMAND - on_get_addr - when Tapyrus::Message::Addr::COMMAND - on_addr(Tapyrus::Message::Addr.parse_from_payload(payload)) - when Tapyrus::Message::SendHeaders::COMMAND - on_send_headers - when Tapyrus::Message::FeeFilter::COMMAND - on_fee_filter(Tapyrus::Message::FeeFilter.parse_from_payload(payload)) - when Tapyrus::Message::Ping::COMMAND - on_ping(Tapyrus::Message::Ping.parse_from_payload(payload)) - when Tapyrus::Message::Pong::COMMAND - on_pong(Tapyrus::Message::Pong.parse_from_payload(payload)) - when Tapyrus::Message::GetHeaders::COMMAND - on_get_headers(Tapyrus::Message::GetHeaders.parse_from_payload(payload)) - when Tapyrus::Message::Headers::COMMAND - on_headers(Tapyrus::Message::Headers.parse_from_payload(payload)) - when Tapyrus::Message::Block::COMMAND - on_block(Tapyrus::Message::Block.parse_from_payload(payload)) - when Tapyrus::Message::Tx::COMMAND - on_tx(Tapyrus::Message::Tx.parse_from_payload(payload)) - when Tapyrus::Message::NotFound::COMMAND - on_not_found(Tapyrus::Message::NotFound.parse_from_payload(payload)) - when Tapyrus::Message::MemPool::COMMAND - on_mem_pool - when Tapyrus::Message::Reject::COMMAND - on_reject(Tapyrus::Message::Reject.parse_from_payload(payload)) - when Tapyrus::Message::SendCmpct::COMMAND - on_send_cmpct(Tapyrus::Message::SendCmpct.parse_from_payload(payload)) - when Tapyrus::Message::Inv::COMMAND - on_inv(Tapyrus::Message::Inv.parse_from_payload(payload)) - when Tapyrus::Message::MerkleBlock::COMMAND - on_merkle_block(Tapyrus::Message::MerkleBlock.parse_from_payload(payload)) - when Tapyrus::Message::CmpctBlock::COMMAND - on_cmpct_block(Tapyrus::Message::CmpctBlock.parse_from_payload(payload)) - when Tapyrus::Message::GetData::COMMAND - on_get_data(Tapyrus::Message::GetData.parse_from_payload(payload)) - else - logger.warn("unsupported command received. command: #{command}, payload: #{payload.bth}") - close("with command #{command}") + when Tapyrus::Message::Version::COMMAND + on_version(Tapyrus::Message::Version.parse_from_payload(payload)) + when Tapyrus::Message::VerAck::COMMAND + on_ver_ack + when Tapyrus::Message::GetAddr::COMMAND + on_get_addr + when Tapyrus::Message::Addr::COMMAND + on_addr(Tapyrus::Message::Addr.parse_from_payload(payload)) + when Tapyrus::Message::SendHeaders::COMMAND + on_send_headers + when Tapyrus::Message::FeeFilter::COMMAND + on_fee_filter(Tapyrus::Message::FeeFilter.parse_from_payload(payload)) + when Tapyrus::Message::Ping::COMMAND + on_ping(Tapyrus::Message::Ping.parse_from_payload(payload)) + when Tapyrus::Message::Pong::COMMAND + on_pong(Tapyrus::Message::Pong.parse_from_payload(payload)) + when Tapyrus::Message::GetHeaders::COMMAND + on_get_headers(Tapyrus::Message::GetHeaders.parse_from_payload(payload)) + when Tapyrus::Message::Headers::COMMAND + on_headers(Tapyrus::Message::Headers.parse_from_payload(payload)) + when Tapyrus::Message::Block::COMMAND + on_block(Tapyrus::Message::Block.parse_from_payload(payload)) + when Tapyrus::Message::Tx::COMMAND + on_tx(Tapyrus::Message::Tx.parse_from_payload(payload)) + when Tapyrus::Message::NotFound::COMMAND + on_not_found(Tapyrus::Message::NotFound.parse_from_payload(payload)) + when Tapyrus::Message::MemPool::COMMAND + on_mem_pool + when Tapyrus::Message::Reject::COMMAND + on_reject(Tapyrus::Message::Reject.parse_from_payload(payload)) + when Tapyrus::Message::SendCmpct::COMMAND + on_send_cmpct(Tapyrus::Message::SendCmpct.parse_from_payload(payload)) + when Tapyrus::Message::Inv::COMMAND + on_inv(Tapyrus::Message::Inv.parse_from_payload(payload)) + when Tapyrus::Message::MerkleBlock::COMMAND + on_merkle_block(Tapyrus::Message::MerkleBlock.parse_from_payload(payload)) + when Tapyrus::Message::CmpctBlock::COMMAND + on_cmpct_block(Tapyrus::Message::CmpctBlock.parse_from_payload(payload)) + when Tapyrus::Message::GetData::COMMAND + on_get_data(Tapyrus::Message::GetData.parse_from_payload(payload)) + else + logger.warn("unsupported command received. command: #{command}, payload: #{payload.bth}") + close("with command #{command}") end end def send_message(msg) logger.info "send message #{msg.class::COMMAND}" @@ -210,15 +211,15 @@ logger.info('receive inv message.') blocks = [] txs = [] inv.inventories.each do |i| case i.identifier - when Tapyrus::Message::Inventory::MSG_TX - txs << i.hash - when Tapyrus::Message::Inventory::MSG_BLOCK - blocks << i.hash - else - logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}") + when Tapyrus::Message::Inventory::MSG_TX + txs << i.hash + when Tapyrus::Message::Inventory::MSG_BLOCK + blocks << i.hash + else + logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}") end end logger.info("receive block= #{blocks.size}, txs: #{txs.size}") peer.handle_block_inv(blocks) unless blocks.empty? end