lib/bitcoin/network/node.rb in bitcoin-ruby-0.0.1 vs lib/bitcoin/network/node.rb in bitcoin-ruby-0.0.2
- old
+ new
@@ -1,5 +1,7 @@
+# encoding: ascii-8bit
+
Bitcoin.require_dependency :eventmachine
Bitcoin.require_dependency :json
require 'fileutils'
module Bitcoin::Network
@@ -34,169 +36,257 @@
attr_reader :addrs
# clients to be notified for new block/tx events
attr_reader :notifiers
- attr_reader :in_sync
+ # our external ip addresses we got told by peers
+ attr_accessor :external_ips
+ # time when the last main chain block was added
+ attr_reader :last_block_time
+
+ attr_accessor :relay_tx
+ attr_accessor :relay_propagation
+
+
DEFAULT_CONFIG = {
+ :network => :bitcoin,
:listen => ["0.0.0.0", Bitcoin.network[:default_port]],
:connect => [],
- :command => "",
- :storage => Bitcoin::Storage.dummy({}),
- :headers_only => false,
+ :command => ["127.0.0.1", 9999],
+ :storage => "utxo::sqlite://~/.bitcoin-ruby/<network>/blocks.db",
+ :mode => :full,
:dns => true,
- :epoll => false,
:epoll_limit => 10000,
:epoll_user => nil,
- :addr_file => "#{ENV['HOME']}/.bitcoin-ruby/addrs.json",
+ :addr_file => "~/.bitcoin-ruby/<network>/peers.json",
:log => {
:network => :info,
:storage => :info,
},
:max => {
+ :connections_out => 8,
+ :connections_in => 32,
:connections => 8,
:addr => 256,
- :queue => 64,
- :inv => 128,
- :inv_cache => 1024,
+ :queue => 501,
+ :inv => 501,
+ :inv_cache => 0,
+ :unconfirmed => 100,
},
:intervals => {
- :queue => 5,
- :inv_queue => 5,
+ :queue => 1,
+ :inv_queue => 1,
:addrs => 5,
- :connect => 15,
- :relay => 600,
+ :connect => 5,
+ :relay => 0,
},
+ :import => nil,
+ :skip_validation => false,
+ :check_blocks => 1000,
}
def initialize config = {}
@config = DEFAULT_CONFIG.deep_merge(config)
@log = Bitcoin::Logger.create(:network, @config[:log][:network])
- @connections = []
- @command_connections = []
- @queue = []
- @queue_thread = nil
- @inv_queue = []
- @inv_queue_thread = nil
+ @connections, @command_connections = [], []
+ @queue, @queue_thread, @inv_queue, @inv_queue_thread = [], nil, [], nil
set_store
load_addrs
@timers = {}
@inv_cache = []
- @notifiers = Hash[[:block, :tx, :connection, :addr].map {|n| [n, EM::Channel.new]}]
- @in_sync = false
+ @notifiers = {}
+ @relay_propagation, @last_block_time, @external_ips = {}, Time.now, []
+ @unconfirmed, @relay_tx = {}, {}
end
def set_store
backend, config = @config[:storage].split('::')
- @store = Bitcoin::Storage.send(backend, {:db => config}, ->(locator) {
+ @store = Bitcoin::Storage.send(backend, {
+ db: config, mode: @config[:mode], cache_head: true,
+ skip_validation: @config[:skip_validation],
+ log_level: @config[:log][:storage]}, ->(locator) {
peer = @connections.select(&:connected?).sample
peer.send_getblocks(locator)
})
@store.log.level = @config[:log][:storage]
+ @store.check_consistency(@config[:check_blocks])
+ if @config[:import]
+ @importing = true
+ EM.defer do
+ begin
+ @store.import(@config[:import]); @importing = false
+ rescue
+ log.fatal { $!.message }
+ puts *$@
+ stop
+ end
+ end
+ end
end
def load_addrs
- unless File.exist?(@config[:addr_file])
+ file = @config[:addr_file].sub("~", ENV["HOME"])
+ .sub("<network>", Bitcoin.network_name.to_s)
+ unless File.exist?(file)
@addrs = []
+ FileUtils.mkdir_p(File.dirname(file))
return
end
- @addrs = JSON.load(File.read(@config[:addr_file])).map do |a|
+ @addrs = JSON.load(File.read(file)).map do |a|
addr = Bitcoin::P::Addr.new
addr.time, addr.service, addr.ip, addr.port =
a['time'], a['service'], a['ip'], a['port']
addr
end
- log.info { "Initialized #{@addrs.size} addrs from #{@config[:addr_file]}." }
+ log.info { "Initialized #{@addrs.size} addrs from #{file}." }
+ rescue
+ @addrs = []
+ log.warn { "Error loading addrs from #{file}." }
end
def store_addrs
return if !@addrs || !@addrs.any?
- file = @config[:addr_file]
+ file = @config[:addr_file].sub("~", ENV["HOME"])
+ .sub("<network>", Bitcoin.network_name.to_s)
FileUtils.mkdir_p(File.dirname(file))
File.open(file, 'w') do |f|
addrs = @addrs.map {|a|
Hash[[:time, :service, :ip, :port].zip(a.entries)] rescue nil }.compact
f.write(JSON.pretty_generate(addrs))
end
- log.info { "Stored #{@addrs.size} addrs to #{file}" }
+ log.info { "Stored #{@addrs.size} addrs to #{file}." }
rescue
log.warn { "Error storing addrs to #{file}." }
end
def stop
- log.info { "Shutting down..." }
+ puts "Shutting down..."
+ stop_timers
EM.stop
end
def uptime
(Time.now - @started).to_i
end
+ def start_timers
+ return EM.add_timer(1) { start_timers } if @importing
+ [:queue, :inv_queue, :addrs, :connect, :relay].each do |name|
+ interval = @config[:intervals][name].to_f
+ next if !interval || interval == 0.0
+ @timers[name] = EM.add_periodic_timer(interval, method("work_#{name}"))
+ end
+ end
+
+ def stop_timers
+ @timers.each {|n, t| EM.cancel_timer t }
+ end
+
+ # initiate epoll with given file descriptor and set effective user
+ def epoll_init
+ log.info { "EPOLL: Available file descriptors: " +
+ EM.set_descriptor_table_size(@config[:epoll_limit]).to_s }
+ if @config[:epoll_user]
+ EM.set_effective_user(@config[:epoll_user])
+ log.info { "EPOLL: Effective user set to: #{@config[:epoll_user]}" }
+ end
+ EM.epoll = true
+ end
+
def run
@started = Time.now
EM.add_shutdown_hook do
store_addrs
log.info { "Bye" }
end
- init_epoll if @config[:epoll]
+ # enable kqueue (BSD, OS X)
+ if EM.kqueue?
+ log.info { 'Using BSD kqueue' }
+ EM.kqueue = true
+ end
+ # enable epoll (Linux)
+ if EM.epoll?
+ log.info { 'Using Linux epoll' }
+ epoll_init
+ end
+
EM.run do
- [:addrs, :connect, :relay].each do |name|
- interval = @config[:intervals][name]
- next if !interval || interval == 0
- @timers[name] = EM.add_periodic_timer(interval, method("work_#{name}"))
- end
- if @config[:command]
- host, port = @config[:command]
+ start_timers
+
+ host, port = *@config[:command]
+ port ||= Bitcoin.network[:default_port]
+ if host
+ log.debug { "Trying to bind command socket to #{host}:#{port}" }
EM.start_server(host, port, CommandHandler, self)
log.info { "Command socket listening on #{host}:#{port}" }
end
- if @config[:listen]
- host, port = @config[:listen]
- EM.start_server(host, port.to_i, ConnectionHandler, self, host, port.to_i)
+ host, port = *@config[:listen]
+ port ||= Bitcoin.network[:default_port]
+ if host
+ log.debug { "Trying to bind server socket to #{host}:#{port}" }
+ EM.start_server(host, port.to_i, ConnectionHandler, self, host, port.to_i, :in)
log.info { "Server socket listening on #{host}:#{port}" }
end
- if @config[:connect].any?
- @config[:connect].each{|host| connect_peer(*host) }
+ @config[:connect].each do |host, port|
+ port ||= Bitcoin.network[:default_port]
+ connect_peer(host, port)
+ log.info { "Connecting to #{host}:#{port}" }
end
work_connect if @addrs.any?
connect_dns if @config[:dns]
- work_inv_queue
- work_queue
+
+ Signal.trap("INT") do
+ puts "Shutting down. You can force-quit by pressing Ctrl-C again, but it might corrupt your database!"
+ Signal.trap("INT") do
+ puts "Force Quit"
+ exit 1
+ end
+ self.stop
+ end
+
end
end
# connect to peer at given +host+ / +port+
def connect_peer host, port
- return if @connections.map{|c| c.host}.include?(host)
- log.info { "Attempting to connect to #{host}:#{port}" }
- EM.connect(host, port.to_i, ConnectionHandler, self, host, port.to_i)
+ return if @connections.map{|c| c.host }.include?(host)
+ log.debug { "Attempting to connect to #{host}:#{port}" }
+ EM.connect(host, port.to_i, ConnectionHandler, self, host, port.to_i, :out)
rescue
- log.warn { "Error connecting to #{host}:#{port}" }
+ log.debug { "Error connecting to #{host}:#{port}" }
log.debug { $!.inspect }
end
# query addrs from dns seed and connect
def connect_dns
unless Bitcoin.network[:dns_seeds].any?
- return log.warn { "No DNS seed nodes available" }
+ log.warn { "No DNS seed nodes available" }
+ return connect_known_peers
end
connect_dns_resolver(Bitcoin.network[:dns_seeds].sample) do |addrs|
log.debug { "DNS returned addrs: #{addrs.inspect}" }
- addrs.sample(@config[:max][:connections] / 2).uniq.each do |addr|
+ addrs.sample(@config[:max][:connections_out] / 2).uniq.each do |addr|
connect_peer(addr, Bitcoin.network[:default_port])
end
end
end
+ def connect_known_peers
+ log.debug { "Attempting to connecting to known nodes" }
+ Bitcoin.network[:known_nodes].shuffle[0..3].each do |node|
+ connect_peer node, Bitcoin.network[:default_port]
+ end
+ end
+
# get peer addrs from given dns +seed+ using em/dns_resolver.
# fallback to using `nslookup` if it is not installed or fails.
def connect_dns_resolver(seed)
if Bitcoin.require_dependency "em/dns_resolver", gem: "em-dns", exit: false
log.info { "Querying addresses from DNS seed: #{seed}" }
@@ -223,11 +313,11 @@
# check if there are enough connections and try to
# establish new ones if needed
def work_connect
log.debug { "Connect worker running" }
- desired = @config[:max][:connections] - @connections.size
+ desired = @config[:max][:connections_out] - @connections.select(&:outgoing?).size
return if desired <= 0
desired = 32 if desired > 32 # connect to max 32 peers at once
if addrs.any?
addrs.sample(desired) do |addr|
Time.now.tv_sec + 10800 - addr.time
@@ -244,13 +334,14 @@
# query blocks from random peer
def getblocks locator = store.get_locator
peer = @connections.select(&:connected?).sample
return unless peer
log.info { "querying blocks from #{peer.host}:#{peer.port}" }
- if @config[:headers_only]
+ case @config[:mode]
+ when /lite/
peer.send_getheaders locator unless @queue.size >= @config[:max][:queue]
- else
+ when /full|pruned/
peer.send_getblocks locator unless @inv_queue.size >= @config[:max][:inv]
end
end
# check if the addr store is full and request new addrs
@@ -266,88 +357,122 @@
end
# check for new items in the queue and process them
def work_queue
@log.debug { "queue worker running" }
- EM.defer(nil, proc { work_queue }) do
- if @queue.size == 0
- getblocks if @inv_queue.size == 0 && !@in_sync
- sleep @config[:intervals][:queue]
+ return getblocks if @queue.size == 0
+
+ # switch off utxo cache once there aren't tons of new blocks coming in
+ if @store.in_sync?
+ if @store.is_a?(Bitcoin::Storage::Backends::UtxoStore) && @store.config[:utxo_cache] > 0
+ log.debug { "switching off utxo cache" }
+ @store.config[:utxo_cache] = 0
end
- while obj = @queue.shift
- begin
- if @store.send("store_#{obj[0]}", obj[1])
- if obj[0].to_sym == :block
- block = @store.get_block(obj[1].hash)
- @notifiers[:block].push([obj[1], block.depth]) if block.chain == 0
- else
- @notifiers[:tx].push([obj[1]])
+ @config[:intervals].each do |name, value|
+ if value <= 1
+ log.debug { "setting #{name} interval to 5 seconds" }
+ @config[:intervals][name] = 5
+ end
+ end
+ end
+
+ while obj = @queue.shift
+ begin
+ if obj[0].to_sym == :block
+ if res = @store.send("new_#{obj[0]}", obj[1])
+ if res[1] == 0 && obj[1].hash == @store.get_head.hash
+ @last_block_time = Time.now
+ push_notification(:block, [obj[1], res[0]])
+ obj[1].tx.each {|tx| @unconfirmed.delete(tx.hash) }
end
+ getblocks if res[1] == 2 && @store.in_sync?
end
- rescue
- @log.warn { $!.inspect }
- puts *$@
+ else
+ drop = @unconfirmed.size - @config[:max][:unconfirmed] + 1
+ drop.times { @unconfirmed.shift } if drop > 0
+ unless @unconfirmed[obj[1].hash]
+ @unconfirmed[obj[1].hash] = obj[1]
+ push_notification(:tx, [obj[1], 0])
+
+ if @notifiers[:output]
+ obj[1].out.each do |out|
+ address = Bitcoin::Script.new(out.pk_script).get_address
+ push_notification(:output, [obj[1].hash, address, out.value, 0])
+ end
+ end
+ end
end
+ rescue Bitcoin::Validation::ValidationError
+ @log.warn { "ValiationError storing #{obj[0]} #{obj[1].hash}: #{$!.message}" }
+ # File.open("./validation_error_#{obj[0]}_#{obj[1].hash}.bin", "w") {|f|
+ # f.write(obj[1].to_payload) }
+ # EM.stop
+ rescue
+ @log.warn { $!.inspect }
+ puts *$@
end
- @in_sync = (@store.get_head && (Time.now - @store.get_head.time).to_i < 3600) ? true : false
end
end
# check for new items in the inv queue and process them,
# unless the queue is already full
def work_inv_queue
- EM.defer(nil, proc { work_inv_queue }) do
- sleep @config[:intervals][:inv_queue] if @inv_queue.size == 0
- @log.debug { "inv queue worker running" }
- if @queue.size >= @config[:max][:queue]
- sleep @config[:intervals][:inv_queue]
- else
- while inv = @inv_queue.shift
- next if !@in_sync && inv[0] == :tx
- next if @queue.map{|i|i[1]}.map(&:hash).include?(inv[1])
- # next if @store.send("has_#{inv[0]}", inv[1])
- inv[2].send("send_getdata_#{inv[0]}", inv[1])
- end
- end
+ @log.debug { "inv queue worker running" }
+ return if @inv_queue.size == 0
+ return if @queue.size >= @config[:max][:queue]
+ while inv = @inv_queue.shift
+ next if !@store.in_sync? && inv[0] == :tx && @notifiers.empty?
+ next if @queue.map{|i|i[1]}.map(&:hash).include?(inv[1])
+ inv[2].send("send_getdata_#{inv[0]}", inv[1])
end
end
# queue inv, caching the most current ones
def queue_inv inv
- @inv_cache.shift(128) if @inv_cache.size > @config[:max][:inv_cache]
- return if @inv_cache.include?([inv[0], inv[1]]) ||
- @inv_queue.size >= @config[:max][:inv] ||
- (!@in_sync && inv[0] == :tx)
- @inv_cache << [inv[0], inv[1]]
- @inv_queue << inv
- end
+ hash = inv[1].unpack("H*")[0]
+ return if @inv_queue.include?(inv) || @queue.select {|i| i[1].hash == hash }.any?
+ return if @store.send("has_#{inv[0]}", hash)
- # initiate epoll with given file descriptor and set effective user
- def init_epoll
- log.info { "EPOLL: Available file descriptors: " +
- EM.set_descriptor_table_size(@config[:epoll_limit]).to_s }
- if @config[:epoll_user]
- EM.set_effective_user(@config[:epoll_user])
- log.info { "EPOLL: Effective user set to: #{@config[:epoll_user]}" }
- end
- EM.epoll
+# @inv_cache.shift(128) if @inv_cache.size > @config[:max][:inv_cache]
+# return if @inv_cache.include?([inv[0], inv[1]]) ||
+# @inv_queue.size >= @config[:max][:inv] ||
+# (!@store.in_sync? && inv[0] == :tx)
+# @inv_cache << [inv[0], inv[1]]
+ @inv_queue << inv
end
- def relay_tx(tx)
- return false unless @in_sync
- @store.store_tx(tx)
- @connections.select(&:connected?).sample((@connections.size / 2) + 1).each do |peer|
- peer.send_inv(:tx, tx)
- end
- end
-
def work_relay
log.debug { "relay worker running" }
@store.get_unconfirmed_tx.each do |tx|
- log.info { "relaying tx #{tx.hash}" }
relay_tx(tx)
end
+ end
+
+ # get the external ip that was suggested in version messages
+ # from other peers most often.
+ def external_ip
+ @external_ips.group_by(&:dup).values.max_by(&:size).first
+ rescue
+ @config[:listen][0]
+ end
+
+ # push notification +message+ to +channel+
+ def push_notification channel, message
+ @notifiers[channel.to_sym].push(message) if @notifiers[channel.to_sym]
+ end
+
+ # subscribe to notification +channel+.
+ # available channels are: block, tx, output, connection.
+ # see CommandHandler for details.
+ def subscribe channel
+ @notifiers[channel.to_sym] ||= EM::Channel.new
+ @notifiers[channel.to_sym].subscribe {|*data| yield(*data) }
+ end
+
+ # should the node accept new incoming connections?
+ def accept_connections?
+ connections.select(&:incoming?).size >= config[:max][:connections_in]
end
end
end