lib/bitcoin/network/connection_handler.rb in bitcoin-ruby-0.0.1 vs lib/bitcoin/network/connection_handler.rb in bitcoin-ruby-0.0.2

- old
+ new

@@ -1,117 +1,163 @@ +# encoding: ascii-8bit + require 'eventmachine' module Bitcoin::Network # Node network connection to a peer. Handles all the communication with a specific peer. - # TODO: incoming/outgoing? class ConnectionHandler < EM::Connection + LATENCY_MAX = (5*60*1000) # 5min in ms + include Bitcoin include Bitcoin::Storage - attr_reader :host, :port, :state, :version + attr_reader :host, :port, :version, :direction - def hth(h); h.unpack("H*")[0]; end - def htb(h); [h].pack("H*"); end + # :new, :handshake, :connected, :disconnected + attr_reader :state + # latency of this connection based on last ping/pong + attr_reader :latency_ms + def log @log ||= Logger::LogWrapper.new("#@host:#@port", @node.log) end # how long has this connection been open? def uptime @started ? (Time.now - @started).to_i : 0 end # create connection to +host+:+port+ for given +node+ - def initialize node, host, port - @node, @host, @port = node, host, port + def initialize node, host, port, direction + @node, @host, @port, @direction = node, host, port, direction @parser = Bitcoin::Protocol::Parser.new(self) @state = :new @version = nil @started = nil + @port, @host = *Socket.unpack_sockaddr_in(get_peername) if get_peername + @ping_nonce = nil + @latency_ms = nil + @lock = Monitor.new + @last_getblocks = [] # the last few getblocks messages received rescue Exception log.fatal { "Error in #initialize" } p $!; puts $@; exit end # check if connection is wanted, begin handshake if it is, disconnect if not def post_init - if @node.connections.size >= @node.config[:max][:connections] - return close_connection unless @node.config[:connect].include?([@host, @port.to_s]) + if incoming? + begin_handshake end - log.info { "Connected to #{@host}:#{@port}" } - @state = :established - @node.connections << self - on_handshake_begin rescue Exception log.fatal { "Error in #post_init" } - p $!; puts $@; exit + p $!; puts *$@ end + # only called for outgoing connection + def connection_completed + begin_handshake + rescue Exception + log.fatal { "Error in #connection_completed" } + p $!; puts *$@ + end + # receive data from peer and invoke Protocol::Parser def receive_data data #log.debug { "Receiving data (#{data.size} bytes)" } - @parser.parse(data) + @lock.synchronize { @parser.parse(data) } + rescue + log.warn { "Error handling data: #{data.hth}" } + p $!; puts *$@ end # connection closed; notify listeners and cleanup connection from node def unbind - log.info { "Disconnected #{@host}:#{@port}" } - @node.notifiers[:connection].push([:disconnected, [@host, @port]]) + log.info { "Disconnected" } + @node.push_notification(:connection, [:disconnected, [@host, @port]]) @state = :disconnected @node.connections.delete(self) end + # begin handshake + # TODO: disconnect if we don't complete within a reasonable time + def begin_handshake + if incoming? && !@node.accept_connections? + return close_connection unless @node.config[:connect].include?([@host, @port.to_s]) + end + log.info { "Established #{@direction} connection" } + @node.connections << self + @state = :handshake + # incoming connections wait to receive a version + send_version if outgoing? + rescue Exception + log.fatal { "Error in #begin_handshake" } + p $!; puts *$@ + end + + # complete handshake; set state, started time, notify listeners and add address to Node + def complete_handshake + if @state == :handshake + log.debug { 'Handshake completed' } + @state = :connected + @started = Time.now + @node.push_notification(:connection, [:connected, info]) + @node.addrs << addr + end + end + # received +inv_tx+ message for given +hash+. # add to inv_queue, unlesss maximum is reached def on_inv_transaction(hash) - log.debug { ">> inv transaction: #{hth(hash)}" } + log.debug { ">> inv transaction: #{hash.hth}" } + if @node.relay_propagation.keys.include?(hash.hth) + @node.relay_propagation[hash.hth] += 1 + end return if @node.inv_queue.size >= @node.config[:max][:inv] @node.queue_inv([:tx, hash, self]) end # received +inv_block+ message for given +hash+. # add to inv_queue, unless maximum is reached def on_inv_block(hash) - log.debug { ">> inv block: #{hth(hash)}" } + log.debug { ">> inv block: #{hash.hth}" } return if @node.inv_queue.size >= @node.config[:max][:inv] @node.queue_inv([:block, hash, self]) end # received +get_tx+ message for given +hash+. # send specified tx if we have it def on_get_transaction(hash) - log.debug { ">> get transaction: #{hash.unpack("H*")[0]}" } - tx = @node.store.get_tx(hash.unpack("H*")[0]) + log.debug { ">> get transaction: #{hash.hth}" } + tx = @node.store.get_tx(hash.hth) + tx ||= @node.relay_tx[hash.hth] return unless tx pkt = Bitcoin::Protocol.pkt("tx", tx.to_payload) log.debug { "<< tx: #{tx.hash}" } send_data pkt end # received +get_block+ message for given +hash+. # send specified block if we have it - # TODO def on_get_block(hash) - log.debug { ">> get block: #{hth(hash)}" } + log.debug { ">> get block: #{hash.hth}" } + blk = @node.store.get_block(hash.hth) + return unless blk + pkt = Bitcoin::Protocol.pkt("block", blk.to_payload) + log.debug { "<< block: #{blk.hash}" } + send_data pkt end - # send +inv+ message with given +type+ for given +obj+ - def send_inv type, obj - pkt = Protocol.inv_pkt(type, [[obj.hash].pack("H*")]) - log.debug { "<< inv #{type}: #{obj.hash}" } - send_data(pkt) - end - # received +addr+ message for given +addr+. # store addr in node and notify listeners def on_addr(addr) log.debug { ">> addr: #{addr.ip}:#{addr.port} alive: #{addr.alive?}, service: #{addr.service}" } @node.addrs << addr - @node.notifiers[:addr].push(addr) + @node.push_notification(:addr, addr) end # received +tx+ message for given +tx+. # push tx to storage queue def on_tx(tx) @@ -134,58 +180,118 @@ end # received +version+ message for given +version+. # send +verack+ message and complete handshake def on_version(version) - log.info { ">> version: #{version.version}" } + log.debug { ">> version: #{version.version}" } + @node.external_ips << version.to.split(":")[0] @version = version - log.info { "<< verack" } + log.debug { "<< verack" } send_data( Protocol.verack_pkt ) - on_handshake_complete + + # sometimes other nodes don't bother to send a verack back, + # but we can consider the handshake complete once we sent ours. + # apparently it can happen on incoming and outgoing connections alike + complete_handshake end # received +verack+ message. # complete handshake if it isn't completed already def on_verack - log.info { ">> verack" } - on_handshake_complete if handshake? + log.debug { ">> verack" } + complete_handshake if outgoing? end # received +alert+ message for given +alert+. # TODO: implement alert logic, store, display, relay def on_alert(alert) log.warn { ">> alert: #{alert.inspect}" } end + # received +getblocks+ message. + # TODO: locator fallback + def on_getblocks(version, hashes, stop_hash) + # remember the last few received getblocks messages and ignore duplicate ones + # fixes unexplained issue where remote node is bombarding us with the same getblocks + # message over and over (probably related to missing locator fallback handling) + return if @last_getblocks && @last_getblocks.include?([version, hashes, stop_hash]) + @last_getblocks << [version, hashes, stop_hash] + @last_getblocks.shift if @last_getblocks.size > 3 + + blk = @node.store.db[:blk][hash: hashes[0].htb.blob] + depth = blk[:depth] if blk + log.info { ">> getblocks #{hashes[0]} (#{depth || 'unknown'})" } + + return unless depth && depth <= @node.store.get_depth + range = (depth+1..depth+500) + blocks = @node.store.db[:blk].where(chain: 0, depth: range).select(:hash).all + + [@node.store.db[:blk].select(:hash)[chain: 0, depth: depth+502]] + send_inv(:block, *blocks.map {|b| b[:hash].hth }) + end + + # received +getaddr+ message. + # send +addr+ message with peer addresses back. + def on_getaddr + addrs = @node.addrs.select{|a| a.time > Time.now.to_i - 10800 }.shuffle[0..250] + log.debug { "<< addr (#{addrs.size})" } + send_data P::Addr.pkt(*addrs) + end + + # begin handshake; send +version+ message + def send_version + from = "#{@node.external_ip}:#{@node.config[:listen][1]}" + version = Bitcoin::Protocol::Version.new({ + :version => 70001, + :last_block => @node.store.get_depth, + :from => from, + :to => @host, + :user_agent => "/bitcoin-ruby:#{Bitcoin::VERSION}/", + #:user_agent => "/Satoshi:0.8.3/", + }) + send_data(version.to_pkt) + log.debug { "<< version: #{Bitcoin.network[:protocol_version]}" } + end + + # send +inv+ message with given +type+ for given +obj+ + def send_inv type, *hashes + hashes.each_slice(251) do |slice| + pkt = Protocol.inv_pkt(type, slice.map(&:htb)) + log.debug { "<< inv #{type}: #{slice[0][0..16]}" + (slice.size > 1 ? "..#{slice[-1][0..16]}" : "") } + send_data(pkt) + end + end + # send +getdata tx+ message for given tx +hash+ def send_getdata_tx(hash) pkt = Protocol.getdata_pkt(:tx, [hash]) - log.debug { "<< getdata tx: #{hth(hash)}" } + log.debug { "<< getdata tx: #{hash.hth}" } send_data(pkt) end # send +getdata block+ message for given block +hash+ def send_getdata_block(hash) pkt = Protocol.getdata_pkt(:block, [hash]) - log.debug { "<< getdata block: #{hth(hash)}" } + log.debug { "<< getdata block: #{hash.hth}" } send_data(pkt) end # send +getblocks+ message def send_getblocks locator = @node.store.get_locator - return get_genesis_block if @node.store.get_depth == -1 - pkt = Protocol.pkt("getblocks", [Bitcoin::network[:magic_head], - locator.size.chr, *locator.map{|l| htb(l).reverse}, "\x00"*32].join) + if @node.store.get_depth == -1 + EM.add_timer(3) { send_getblocks } + return get_genesis_block + end + pkt = Protocol.getblocks_pkt(@version.version, locator) log.info { "<< getblocks: #{locator.first}" } send_data(pkt) end # send +getheaders+ message def send_getheaders locator = @node.store.get_locator return get_genesis_block if @node.store.get_depth == -1 pkt = Protocol.pkt("getheaders", [Bitcoin::network[:magic_head], - locator.size.chr, *locator.map{|l| htb(l).reverse}, "\x00"*32].join) + locator.size.chr, *locator.map{|l| l.htb_reverse}, "\x00"*32].join) log.debug { "<< getheaders: #{locator.first}" } send_data(pkt) end # send +getaddr+ message @@ -195,65 +301,67 @@ end # send +ping+ message # TODO: wait for pong and disconnect if it doesn't arrive (and version is new enough) def send_ping - nonce = rand(0xffffffff) - log.debug { "<< ping (#{nonce})" } - send_data(Protocol.ping_pkt(nonce)) + if @version.version > Bitcoin::Protocol::BIP0031_VERSION + @latency_ms = LATENCY_MAX + @ping_nonce = rand(0xffffffff) + @ping_time = Time.now + log.debug { "<< ping (#{@ping_nonce})" } + send_data(Protocol.ping_pkt(@ping_nonce)) + else + # set latency to 5 seconds, terrible but this version should be obsolete now + @latency_ms = (5*1000) + log.debug { "<< ping" } + send_data(Protocol.ping_pkt) + end end # ask for the genesis block def get_genesis_block log.info { "Asking for genesis block" } - pkt = Protocol.getdata_pkt(:block, [htb(Bitcoin::network[:genesis_hash])]) + pkt = Protocol.getdata_pkt(:block, [Bitcoin::network[:genesis_hash].htb]) send_data(pkt) end - # complete handshake; set state, started time, notify listeners and add address to Node - def on_handshake_complete - return unless handshake? - log.debug { "handshake complete" } - @state = :connected - @started = Time.now - @node.notifiers[:connection].push([:connected, info]) - @node.addrs << addr - # send_getaddr - # EM.add_periodic_timer(15) { send_ping } - end - # received +ping+ message with given +nonce+. # send +pong+ message back, if +nonce+ is set. # network versions <=60000 don't set the nonce and don't expect a pong. def on_ping nonce log.debug { ">> ping (#{nonce})" } send_data(Protocol.pong_pkt(nonce)) if nonce end # received +pong+ message with given +nonce+. - # TODO: see #send_ping def on_pong nonce - log.debug { ">> pong (#{nonce})" } + if @ping_nonce == nonce + @latency_ms = (Time.now - @ping_time) * 1000.0 + end + log.debug { ">> pong (#{nonce}), latency: #{@latency_ms.to_i}ms" } end # begin handshake; send +version+ message def on_handshake_begin @state = :handshake - block = @node.store.get_depth - from = "127.0.0.1:8333" - from_id = Bitcoin::Protocol::Uniq - to = @node.config[:listen].join(':') - - pkt = Protocol.version_pkt(from_id, from, to, block) - log.info { "<< version (#{Bitcoin::Protocol::VERSION})" } - send_data(pkt) + from = "#{@node.external_ip}:#{@node.config[:listen][1]}" + version = Bitcoin::Protocol::Version.new({ + :version => 70001, + :last_block => @node.store.get_depth, + :from => from, + :to => @host, + :user_agent => "/bitcoin-ruby:#{Bitcoin::VERSION}/", + #:user_agent => "/Satoshi:0.8.1/", + }) + send_data(version.to_pkt) + log.debug { "<< version (#{Bitcoin.network[:protocol_version]})" } end # get Addr object for this connection def addr return @addr if @addr - @addr = Bitcoin::Protocol::Addr.new + @addr = P::Addr.new @addr.time, @addr.service, @addr.ip, @addr.port = Time.now.tv_sec, @version.services, @host, @port @addr end @@ -263,12 +371,16 @@ # get info hash def info { :host => @host, :port => @port, :state => @state, - :version => @version.version, :block => @version.block, :started => @started.to_i, - :user_agent => @version.user_agent + :version => (@version.version rescue 0), :block => @version.last_block, + :started => @started.to_i, :user_agent => @version.user_agent } end + + def incoming?; @direction == :in; end + def outgoing?; @direction == :out; end + end end