lib/bitcoin/network/node.rb in bitcoin-ruby-0.0.1 vs lib/bitcoin/network/node.rb in bitcoin-ruby-0.0.2

- old
+ new

@@ -1,5 +1,7 @@ +# encoding: ascii-8bit + Bitcoin.require_dependency :eventmachine Bitcoin.require_dependency :json require 'fileutils' module Bitcoin::Network @@ -34,169 +36,257 @@ attr_reader :addrs # clients to be notified for new block/tx events attr_reader :notifiers - attr_reader :in_sync + # our external ip addresses we got told by peers + attr_accessor :external_ips + # time when the last main chain block was added + attr_reader :last_block_time + + attr_accessor :relay_tx + attr_accessor :relay_propagation + + DEFAULT_CONFIG = { + :network => :bitcoin, :listen => ["0.0.0.0", Bitcoin.network[:default_port]], :connect => [], - :command => "", - :storage => Bitcoin::Storage.dummy({}), - :headers_only => false, + :command => ["127.0.0.1", 9999], + :storage => "utxo::sqlite://~/.bitcoin-ruby/<network>/blocks.db", + :mode => :full, :dns => true, - :epoll => false, :epoll_limit => 10000, :epoll_user => nil, - :addr_file => "#{ENV['HOME']}/.bitcoin-ruby/addrs.json", + :addr_file => "~/.bitcoin-ruby/<network>/peers.json", :log => { :network => :info, :storage => :info, }, :max => { + :connections_out => 8, + :connections_in => 32, :connections => 8, :addr => 256, - :queue => 64, - :inv => 128, - :inv_cache => 1024, + :queue => 501, + :inv => 501, + :inv_cache => 0, + :unconfirmed => 100, }, :intervals => { - :queue => 5, - :inv_queue => 5, + :queue => 1, + :inv_queue => 1, :addrs => 5, - :connect => 15, - :relay => 600, + :connect => 5, + :relay => 0, }, + :import => nil, + :skip_validation => false, + :check_blocks => 1000, } def initialize config = {} @config = DEFAULT_CONFIG.deep_merge(config) @log = Bitcoin::Logger.create(:network, @config[:log][:network]) - @connections = [] - @command_connections = [] - @queue = [] - @queue_thread = nil - @inv_queue = [] - @inv_queue_thread = nil + @connections, @command_connections = [], [] + @queue, @queue_thread, @inv_queue, @inv_queue_thread = [], nil, [], nil set_store load_addrs @timers = {} @inv_cache = [] - @notifiers = Hash[[:block, :tx, :connection, :addr].map {|n| [n, EM::Channel.new]}] - @in_sync = false + @notifiers = {} + @relay_propagation, @last_block_time, @external_ips = {}, Time.now, [] + @unconfirmed, @relay_tx = {}, {} end def set_store backend, config = @config[:storage].split('::') - @store = Bitcoin::Storage.send(backend, {:db => config}, ->(locator) { + @store = Bitcoin::Storage.send(backend, { + db: config, mode: @config[:mode], cache_head: true, + skip_validation: @config[:skip_validation], + log_level: @config[:log][:storage]}, ->(locator) { peer = @connections.select(&:connected?).sample peer.send_getblocks(locator) }) @store.log.level = @config[:log][:storage] + @store.check_consistency(@config[:check_blocks]) + if @config[:import] + @importing = true + EM.defer do + begin + @store.import(@config[:import]); @importing = false + rescue + log.fatal { $!.message } + puts *$@ + stop + end + end + end end def load_addrs - unless File.exist?(@config[:addr_file]) + file = @config[:addr_file].sub("~", ENV["HOME"]) + .sub("<network>", Bitcoin.network_name.to_s) + unless File.exist?(file) @addrs = [] + FileUtils.mkdir_p(File.dirname(file)) return end - @addrs = JSON.load(File.read(@config[:addr_file])).map do |a| + @addrs = JSON.load(File.read(file)).map do |a| addr = Bitcoin::P::Addr.new addr.time, addr.service, addr.ip, addr.port = a['time'], a['service'], a['ip'], a['port'] addr end - log.info { "Initialized #{@addrs.size} addrs from #{@config[:addr_file]}." } + log.info { "Initialized #{@addrs.size} addrs from #{file}." } + rescue + @addrs = [] + log.warn { "Error loading addrs from #{file}." } end def store_addrs return if !@addrs || !@addrs.any? - file = @config[:addr_file] + file = @config[:addr_file].sub("~", ENV["HOME"]) + .sub("<network>", Bitcoin.network_name.to_s) FileUtils.mkdir_p(File.dirname(file)) File.open(file, 'w') do |f| addrs = @addrs.map {|a| Hash[[:time, :service, :ip, :port].zip(a.entries)] rescue nil }.compact f.write(JSON.pretty_generate(addrs)) end - log.info { "Stored #{@addrs.size} addrs to #{file}" } + log.info { "Stored #{@addrs.size} addrs to #{file}." } rescue log.warn { "Error storing addrs to #{file}." } end def stop - log.info { "Shutting down..." } + puts "Shutting down..." + stop_timers EM.stop end def uptime (Time.now - @started).to_i end + def start_timers + return EM.add_timer(1) { start_timers } if @importing + [:queue, :inv_queue, :addrs, :connect, :relay].each do |name| + interval = @config[:intervals][name].to_f + next if !interval || interval == 0.0 + @timers[name] = EM.add_periodic_timer(interval, method("work_#{name}")) + end + end + + def stop_timers + @timers.each {|n, t| EM.cancel_timer t } + end + + # initiate epoll with given file descriptor and set effective user + def epoll_init + log.info { "EPOLL: Available file descriptors: " + + EM.set_descriptor_table_size(@config[:epoll_limit]).to_s } + if @config[:epoll_user] + EM.set_effective_user(@config[:epoll_user]) + log.info { "EPOLL: Effective user set to: #{@config[:epoll_user]}" } + end + EM.epoll = true + end + def run @started = Time.now EM.add_shutdown_hook do store_addrs log.info { "Bye" } end - init_epoll if @config[:epoll] + # enable kqueue (BSD, OS X) + if EM.kqueue? + log.info { 'Using BSD kqueue' } + EM.kqueue = true + end + # enable epoll (Linux) + if EM.epoll? + log.info { 'Using Linux epoll' } + epoll_init + end + EM.run do - [:addrs, :connect, :relay].each do |name| - interval = @config[:intervals][name] - next if !interval || interval == 0 - @timers[name] = EM.add_periodic_timer(interval, method("work_#{name}")) - end - if @config[:command] - host, port = @config[:command] + start_timers + + host, port = *@config[:command] + port ||= Bitcoin.network[:default_port] + if host + log.debug { "Trying to bind command socket to #{host}:#{port}" } EM.start_server(host, port, CommandHandler, self) log.info { "Command socket listening on #{host}:#{port}" } end - if @config[:listen] - host, port = @config[:listen] - EM.start_server(host, port.to_i, ConnectionHandler, self, host, port.to_i) + host, port = *@config[:listen] + port ||= Bitcoin.network[:default_port] + if host + log.debug { "Trying to bind server socket to #{host}:#{port}" } + EM.start_server(host, port.to_i, ConnectionHandler, self, host, port.to_i, :in) log.info { "Server socket listening on #{host}:#{port}" } end - if @config[:connect].any? - @config[:connect].each{|host| connect_peer(*host) } + @config[:connect].each do |host, port| + port ||= Bitcoin.network[:default_port] + connect_peer(host, port) + log.info { "Connecting to #{host}:#{port}" } end work_connect if @addrs.any? connect_dns if @config[:dns] - work_inv_queue - work_queue + + Signal.trap("INT") do + puts "Shutting down. You can force-quit by pressing Ctrl-C again, but it might corrupt your database!" + Signal.trap("INT") do + puts "Force Quit" + exit 1 + end + self.stop + end + end end # connect to peer at given +host+ / +port+ def connect_peer host, port - return if @connections.map{|c| c.host}.include?(host) - log.info { "Attempting to connect to #{host}:#{port}" } - EM.connect(host, port.to_i, ConnectionHandler, self, host, port.to_i) + return if @connections.map{|c| c.host }.include?(host) + log.debug { "Attempting to connect to #{host}:#{port}" } + EM.connect(host, port.to_i, ConnectionHandler, self, host, port.to_i, :out) rescue - log.warn { "Error connecting to #{host}:#{port}" } + log.debug { "Error connecting to #{host}:#{port}" } log.debug { $!.inspect } end # query addrs from dns seed and connect def connect_dns unless Bitcoin.network[:dns_seeds].any? - return log.warn { "No DNS seed nodes available" } + log.warn { "No DNS seed nodes available" } + return connect_known_peers end connect_dns_resolver(Bitcoin.network[:dns_seeds].sample) do |addrs| log.debug { "DNS returned addrs: #{addrs.inspect}" } - addrs.sample(@config[:max][:connections] / 2).uniq.each do |addr| + addrs.sample(@config[:max][:connections_out] / 2).uniq.each do |addr| connect_peer(addr, Bitcoin.network[:default_port]) end end end + def connect_known_peers + log.debug { "Attempting to connecting to known nodes" } + Bitcoin.network[:known_nodes].shuffle[0..3].each do |node| + connect_peer node, Bitcoin.network[:default_port] + end + end + # get peer addrs from given dns +seed+ using em/dns_resolver. # fallback to using `nslookup` if it is not installed or fails. def connect_dns_resolver(seed) if Bitcoin.require_dependency "em/dns_resolver", gem: "em-dns", exit: false log.info { "Querying addresses from DNS seed: #{seed}" } @@ -223,11 +313,11 @@ # check if there are enough connections and try to # establish new ones if needed def work_connect log.debug { "Connect worker running" } - desired = @config[:max][:connections] - @connections.size + desired = @config[:max][:connections_out] - @connections.select(&:outgoing?).size return if desired <= 0 desired = 32 if desired > 32 # connect to max 32 peers at once if addrs.any? addrs.sample(desired) do |addr| Time.now.tv_sec + 10800 - addr.time @@ -244,13 +334,14 @@ # query blocks from random peer def getblocks locator = store.get_locator peer = @connections.select(&:connected?).sample return unless peer log.info { "querying blocks from #{peer.host}:#{peer.port}" } - if @config[:headers_only] + case @config[:mode] + when /lite/ peer.send_getheaders locator unless @queue.size >= @config[:max][:queue] - else + when /full|pruned/ peer.send_getblocks locator unless @inv_queue.size >= @config[:max][:inv] end end # check if the addr store is full and request new addrs @@ -266,88 +357,122 @@ end # check for new items in the queue and process them def work_queue @log.debug { "queue worker running" } - EM.defer(nil, proc { work_queue }) do - if @queue.size == 0 - getblocks if @inv_queue.size == 0 && !@in_sync - sleep @config[:intervals][:queue] + return getblocks if @queue.size == 0 + + # switch off utxo cache once there aren't tons of new blocks coming in + if @store.in_sync? + if @store.is_a?(Bitcoin::Storage::Backends::UtxoStore) && @store.config[:utxo_cache] > 0 + log.debug { "switching off utxo cache" } + @store.config[:utxo_cache] = 0 end - while obj = @queue.shift - begin - if @store.send("store_#{obj[0]}", obj[1]) - if obj[0].to_sym == :block - block = @store.get_block(obj[1].hash) - @notifiers[:block].push([obj[1], block.depth]) if block.chain == 0 - else - @notifiers[:tx].push([obj[1]]) + @config[:intervals].each do |name, value| + if value <= 1 + log.debug { "setting #{name} interval to 5 seconds" } + @config[:intervals][name] = 5 + end + end + end + + while obj = @queue.shift + begin + if obj[0].to_sym == :block + if res = @store.send("new_#{obj[0]}", obj[1]) + if res[1] == 0 && obj[1].hash == @store.get_head.hash + @last_block_time = Time.now + push_notification(:block, [obj[1], res[0]]) + obj[1].tx.each {|tx| @unconfirmed.delete(tx.hash) } end + getblocks if res[1] == 2 && @store.in_sync? end - rescue - @log.warn { $!.inspect } - puts *$@ + else + drop = @unconfirmed.size - @config[:max][:unconfirmed] + 1 + drop.times { @unconfirmed.shift } if drop > 0 + unless @unconfirmed[obj[1].hash] + @unconfirmed[obj[1].hash] = obj[1] + push_notification(:tx, [obj[1], 0]) + + if @notifiers[:output] + obj[1].out.each do |out| + address = Bitcoin::Script.new(out.pk_script).get_address + push_notification(:output, [obj[1].hash, address, out.value, 0]) + end + end + end end + rescue Bitcoin::Validation::ValidationError + @log.warn { "ValiationError storing #{obj[0]} #{obj[1].hash}: #{$!.message}" } + # File.open("./validation_error_#{obj[0]}_#{obj[1].hash}.bin", "w") {|f| + # f.write(obj[1].to_payload) } + # EM.stop + rescue + @log.warn { $!.inspect } + puts *$@ end - @in_sync = (@store.get_head && (Time.now - @store.get_head.time).to_i < 3600) ? true : false end end # check for new items in the inv queue and process them, # unless the queue is already full def work_inv_queue - EM.defer(nil, proc { work_inv_queue }) do - sleep @config[:intervals][:inv_queue] if @inv_queue.size == 0 - @log.debug { "inv queue worker running" } - if @queue.size >= @config[:max][:queue] - sleep @config[:intervals][:inv_queue] - else - while inv = @inv_queue.shift - next if !@in_sync && inv[0] == :tx - next if @queue.map{|i|i[1]}.map(&:hash).include?(inv[1]) - # next if @store.send("has_#{inv[0]}", inv[1]) - inv[2].send("send_getdata_#{inv[0]}", inv[1]) - end - end + @log.debug { "inv queue worker running" } + return if @inv_queue.size == 0 + return if @queue.size >= @config[:max][:queue] + while inv = @inv_queue.shift + next if !@store.in_sync? && inv[0] == :tx && @notifiers.empty? + next if @queue.map{|i|i[1]}.map(&:hash).include?(inv[1]) + inv[2].send("send_getdata_#{inv[0]}", inv[1]) end end # queue inv, caching the most current ones def queue_inv inv - @inv_cache.shift(128) if @inv_cache.size > @config[:max][:inv_cache] - return if @inv_cache.include?([inv[0], inv[1]]) || - @inv_queue.size >= @config[:max][:inv] || - (!@in_sync && inv[0] == :tx) - @inv_cache << [inv[0], inv[1]] - @inv_queue << inv - end + hash = inv[1].unpack("H*")[0] + return if @inv_queue.include?(inv) || @queue.select {|i| i[1].hash == hash }.any? + return if @store.send("has_#{inv[0]}", hash) - # initiate epoll with given file descriptor and set effective user - def init_epoll - log.info { "EPOLL: Available file descriptors: " + - EM.set_descriptor_table_size(@config[:epoll_limit]).to_s } - if @config[:epoll_user] - EM.set_effective_user(@config[:epoll_user]) - log.info { "EPOLL: Effective user set to: #{@config[:epoll_user]}" } - end - EM.epoll +# @inv_cache.shift(128) if @inv_cache.size > @config[:max][:inv_cache] +# return if @inv_cache.include?([inv[0], inv[1]]) || +# @inv_queue.size >= @config[:max][:inv] || +# (!@store.in_sync? && inv[0] == :tx) +# @inv_cache << [inv[0], inv[1]] + @inv_queue << inv end - def relay_tx(tx) - return false unless @in_sync - @store.store_tx(tx) - @connections.select(&:connected?).sample((@connections.size / 2) + 1).each do |peer| - peer.send_inv(:tx, tx) - end - end - def work_relay log.debug { "relay worker running" } @store.get_unconfirmed_tx.each do |tx| - log.info { "relaying tx #{tx.hash}" } relay_tx(tx) end + end + + # get the external ip that was suggested in version messages + # from other peers most often. + def external_ip + @external_ips.group_by(&:dup).values.max_by(&:size).first + rescue + @config[:listen][0] + end + + # push notification +message+ to +channel+ + def push_notification channel, message + @notifiers[channel.to_sym].push(message) if @notifiers[channel.to_sym] + end + + # subscribe to notification +channel+. + # available channels are: block, tx, output, connection. + # see CommandHandler for details. + def subscribe channel + @notifiers[channel.to_sym] ||= EM::Channel.new + @notifiers[channel.to_sym].subscribe {|*data| yield(*data) } + end + + # should the node accept new incoming connections? + def accept_connections? + connections.select(&:incoming?).size >= config[:max][:connections_in] end end end