lib/bitcoin/storage/sequel/sequel_store.rb in bitcoin-ruby-0.0.5 vs lib/bitcoin/storage/sequel/sequel_store.rb in bitcoin-ruby-0.0.6
- old
+ new
@@ -5,17 +5,26 @@
module Bitcoin::Storage::Backends
# Storage backend using Sequel to connect to arbitrary SQL databases.
# Inherits from StoreBase and implements its interface.
- class SequelStore < StoreBase
+ class SequelStore < SequelStoreBase
# sequel database connection
attr_accessor :db
- DEFAULT_CONFIG = { mode: :full, cache_head: false }
+ DEFAULT_CONFIG = {
+ # TODO
+ mode: :full,
+ # cache head block. only the instance that is updating the head should do this.
+ cache_head: false,
+
+ # store an index of tx.nhash values
+ index_nhash: false
+ }
+
# create sequel store with given +config+
def initialize config, *args
super config, *args
end
@@ -54,37 +63,42 @@
else
block_id = @db[:blk].insert(attrs)
blk_tx, new_tx, addrs, names = [], [], [], []
# store tx
+ existing_tx = Hash[*@db[:tx].filter(hash: blk.tx.map {|tx| tx.hash.htb.blob }).map { |tx| [tx[:hash].hth, tx[:id]] }.flatten]
blk.tx.each.with_index do |tx, idx|
- existing = @db[:tx][hash: tx.hash.htb.blob]
- existing ? blk_tx[idx] = existing[:id] : new_tx << [tx, idx]
+ existing = existing_tx[tx.hash]
+ existing ? blk_tx[idx] = existing : new_tx << [tx, idx]
end
- new_tx_ids = @db[:tx].insert_multiple(new_tx.map {|tx, _| tx_data(tx) })
+
+ new_tx_ids = fast_insert(:tx, new_tx.map {|tx, _| tx_data(tx) }, return_ids: true)
new_tx_ids.each.with_index {|tx_id, idx| blk_tx[new_tx[idx][1]] = tx_id }
- @db[:blk_tx].insert_multiple(blk_tx.map.with_index {|id, idx|
- { blk_id: block_id, tx_id: id, idx: idx } })
+ fast_insert(:blk_tx, blk_tx.map.with_index {|id, idx| { blk_id: block_id, tx_id: id, idx: idx } })
# store txins
- txin_ids = @db[:txin].insert_multiple(new_tx.map.with_index {|tx, tx_idx|
+ fast_insert(:txin, new_tx.map.with_index {|tx, tx_idx|
tx, _ = *tx
tx.in.map.with_index {|txin, txin_idx|
- txin_data(new_tx_ids[tx_idx], txin, txin_idx) } }.flatten)
+ p2sh_type = nil
+ if @config[:index_p2sh_type] && !txin.coinbase? && (script = tx.scripts[txin_idx]) && script.is_p2sh?
+ p2sh_type = Bitcoin::Script.new(script.inner_p2sh_script).type
+ end
+ txin_data(new_tx_ids[tx_idx], txin, txin_idx, p2sh_type) } }.flatten)
# store txouts
txout_i = 0
- txout_ids = @db[:txout].insert_multiple(new_tx.map.with_index {|tx, tx_idx|
+ txout_ids = fast_insert(:txout, new_tx.map.with_index {|tx, tx_idx|
tx, _ = *tx
tx.out.map.with_index {|txout, txout_idx|
- script_type, a, n = *parse_script(txout, txout_i, tx.hash)
+ script_type, a, n = *parse_script(txout, txout_i, tx.hash, txout_idx)
addrs += a; names += n; txout_i += 1
- txout_data(new_tx_ids[tx_idx], txout, txout_idx, script_type) } }.flatten)
+ txout_data(new_tx_ids[tx_idx], txout, txout_idx, script_type) } }.flatten, return_ids: true)
# store addrs
- persist_addrs addrs.map {|i, h| [txout_ids[i], h]}
+ persist_addrs addrs.map {|i, addr| [txout_ids[i], addr]}
names.each {|i, script| store_name(script, txout_ids[i]) }
end
@head = wrap_block(attrs.merge(id: block_id)) if chain == MAIN
@db[:blk].where(:prev_hash => blk.hash.htb.blob, :chain => ORPHAN).each do |b|
log.debug { "connecting orphan #{b[:hash].hth}" }
@@ -99,69 +113,74 @@
end
def reorg new_side, new_main
@db.transaction do
@db[:blk].where(hash: new_side.map {|h| h.htb.blob }).update(chain: SIDE)
- new_main.each {|b| get_block(b).validator(self).validate(raise_errors: true) } unless @config[:skip_validation]
- @db[:blk].where(hash: new_main.map {|h| h.htb.blob }).update(chain: MAIN)
- end
- end
-
- # parse script and collect address/txout mappings to index
- def parse_script txout, i, tx_hash = ""
- addrs, names = [], []
-
- script = Bitcoin::Script.new(txout.pk_script) rescue nil
- if script
- if script.is_hash160? || script.is_pubkey?
- addrs << [i, script.get_hash160]
- elsif script.is_multisig?
- script.get_multisig_pubkeys.map do |pubkey|
- addrs << [i, Bitcoin.hash160(pubkey.unpack("H*")[0])]
+ new_main.each do |block_hash|
+ unless @config[:skip_validation]
+ get_block(block_hash).validator(self).validate(raise_errors: true)
end
- elsif Bitcoin.namecoin? && script.is_namecoin?
- addrs << [i, script.get_hash160]
- names << [i, script]
- else
- log.info { "Unknown script type in #{tx_hash}:#{i}" }
- log.debug { script.to_string }
+ @db[:blk].where(hash: block_hash.htb.blob).update(chain: MAIN)
end
- script_type = SCRIPT_TYPES.index(script.type)
- else
- log.error { "Error parsing script #{tx_hash}:#{i}" }
- script_type = SCRIPT_TYPES.index(:unknown)
end
- [script_type, addrs, names]
end
# bulk-store addresses and txout mappings
def persist_addrs addrs
addr_txouts, new_addrs = [], []
- addrs.group_by {|_, a| a }.each do |hash160, txouts|
- if existing = @db[:addr][:hash160 => hash160]
- txouts.each {|id, _| addr_txouts << [existing[:id], id] }
+
+ # find addresses that are already there
+ existing_addr = {}
+ addrs.each do |i, addr|
+ hash160 = Bitcoin.hash160_from_address(addr)
+ type = Bitcoin.address_type(addr)
+ if existing = @db[:addr][hash160: hash160, type: ADDRESS_TYPES.index(type)]
+ existing_addr[[hash160, type]] = existing[:id]
+ end
+ end
+
+ # iterate over all txouts, grouped by hash160
+ addrs.group_by {|_, a| a }.each do |addr, txouts|
+ hash160 = Bitcoin.hash160_from_address(addr)
+ type = Bitcoin.address_type(addr)
+
+ if existing_id = existing_addr[[hash160, type]]
+ # link each txout to existing address
+ txouts.each {|id, _| addr_txouts << [existing_id, id] }
else
- new_addrs << [hash160, txouts.map {|id, _| id }]
+ # collect new address/txout mapping
+ new_addrs << [[hash160, type], txouts.map {|id, _| id }]
end
end
- new_addr_ids = @db[:addr].insert_multiple(new_addrs.map {|hash160, txout_id|
- { hash160: hash160 } })
+
+ # insert all new addresses
+ new_addr_ids = fast_insert(:addr, new_addrs.map {|hash160_and_type, txout_id|
+ hash160, type = *hash160_and_type
+ { hash160: hash160, type: ADDRESS_TYPES.index(type) }
+ }, return_ids: true)
+
+
+ # link each new txout to the new addresses
new_addr_ids.each.with_index do |addr_id, idx|
new_addrs[idx][1].each do |txout_id|
addr_txouts << [addr_id, txout_id]
end
end
- @db[:addr_txout].insert_multiple(addr_txouts.map {|addr_id, txout_id|
- { addr_id: addr_id, txout_id: txout_id }})
+
+ # insert addr/txout links
+ fast_insert(:addr_txout, addr_txouts.map {|addr_id, txout_id| { addr_id: addr_id, txout_id: txout_id }})
end
# prepare transaction data for storage
def tx_data tx
- { hash: tx.hash.htb.blob,
+ data = {
+ hash: tx.hash.htb.blob,
version: tx.ver, lock_time: tx.lock_time,
coinbase: tx.in.size == 1 && tx.in[0].coinbase?,
tx_size: tx.payload.bytesize }
+ data[:nhash] = tx.nhash.htb.blob if @config[:index_nhash]
+ data
end
# store transaction +tx+
def store_tx(tx, validate = true)
@log.debug { "Storing tx #{tx.hash} (#{tx.to_payload.bytesize} bytes)" }
@@ -175,21 +194,25 @@
tx_id
end
end
# prepare txin data for storage
- def txin_data tx_id, txin, idx
- { tx_id: tx_id, tx_idx: idx,
+ def txin_data tx_id, txin, idx, p2sh_type = nil
+ data = {
+ tx_id: tx_id, tx_idx: idx,
script_sig: txin.script_sig.blob,
prev_out: txin.prev_out.blob,
prev_out_index: txin.prev_out_index,
- sequence: txin.sequence.unpack("V")[0] }
+ sequence: txin.sequence.unpack("V")[0],
+ }
+ data[:p2sh_type] = SCRIPT_TYPES.index(p2sh_type) if @config[:index_p2sh_type]
+ data
end
# store input +txin+
- def store_txin(tx_id, txin, idx)
- @db[:txin].insert(txin_data(tx_id, txin, idx))
+ def store_txin(tx_id, txin, idx, p2sh_type = nil)
+ @db[:txin].insert(txin_data(tx_id, txin, idx, p2sh_type))
end
# prepare txout data for storage
def txout_data tx_id, txout, idx, script_type
{ tx_id: tx_id, tx_idx: idx,
@@ -197,11 +220,11 @@
value: txout.value, type: script_type }
end
# store output +txout+
def store_txout(tx_id, txout, idx, tx_hash = "")
- script_type, addrs, names = *parse_script(txout, idx, tx_hash)
+ script_type, addrs, names = *parse_script(txout, idx, tx_hash, idx)
txout_id = @db[:txout].insert(txout_data(tx_id, txout, idx, script_type))
persist_addrs addrs.map {|i, h| [txout_id, h] }
names.each {|i, script| store_name(script, txout_id) }
txout_id
end
@@ -216,13 +239,13 @@
tx.out.each {|o| @db[:txout].where(:id => o.id).delete }
@db[:tx].where(:id => tx.id).delete
end
end
- # check if block +blk_hash+ exists
+ # check if block +blk_hash+ exists in the main chain
def has_block(blk_hash)
- !!@db[:blk].where(:hash => blk_hash.htb.blob).get(1)
+ !!@db[:blk].where(:hash => blk_hash.htb.blob, :chain => 0).get(1)
end
# check if transaction +tx_hash+ exists
def has_tx(tx_hash)
!!@db[:tx].where(:hash => tx_hash.htb.blob).get(1)
@@ -275,15 +298,35 @@
# get block by given +id+
def get_block_by_id(block_id)
wrap_block(@db[:blk][:id => block_id])
end
+ # get block id in the main chain by given +tx_id+
+ def get_block_id_for_tx_id(tx_id)
+ @db[:blk_tx].join(:blk, id: :blk_id)
+ .where(tx_id: tx_id, chain: MAIN).first[:blk_id] rescue nil
+ end
+
# get transaction for given +tx_hash+
def get_tx(tx_hash)
wrap_tx(@db[:tx][:hash => tx_hash.htb.blob])
end
+ # get array of txes with given +tx_hashes+
+ def get_txs(tx_hashes)
+ txs = db[:tx].filter(hash: tx_hashes.map{|h| h.htb.blob})
+ txs_ids = txs.map {|tx| tx[:id]}
+ return [] if txs_ids.empty?
+
+ # we fetch all needed block ids, inputs and outputs to avoid doing number of queries propertional to number of transactions
+ block_ids = Hash[*db[:blk_tx].join(:blk, id: :blk_id).filter(tx_id: txs_ids, chain: 0).map {|b| [b[:tx_id], b[:blk_id]] }.flatten]
+ inputs = db[:txin].filter(:tx_id => txs_ids).order(:tx_idx).map.group_by{ |txin| txin[:tx_id] }
+ outputs = db[:txout].filter(:tx_id => txs_ids).order(:tx_idx).map.group_by{ |txout| txout[:tx_id] }
+
+ txs.map {|tx| wrap_tx(tx, block_ids[tx[:id]], inputs: inputs[tx[:id]], outputs: outputs[tx[:id]]) }
+ end
+
# get transaction by given +tx_id+
def get_tx_by_id(tx_id)
wrap_tx(@db[:tx][:id => tx_id])
end
@@ -292,10 +335,15 @@
def get_txin_for_txout(tx_hash, txout_idx)
tx_hash = tx_hash.htb_reverse.blob
wrap_txin(@db[:txin][:prev_out => tx_hash, :prev_out_index => txout_idx])
end
+ # optimized version of Storage#get_txins_for_txouts
+ def get_txins_for_txouts(txouts)
+ @db[:txin].filter([:prev_out, :prev_out_index] => txouts.map{|tx_hash, tx_idx| [tx_hash.htb_reverse.blob, tx_idx]}).map{|i| wrap_txin(i)}
+ end
+
def get_txout_by_id(txout_id)
wrap_txout(@db[:txout][:id => txout_id])
end
# get corresponding Models::TxOut for +txin+
@@ -310,18 +358,18 @@
txouts = @db[:txout].filter(:pk_script => script.blob).order(:id)
txouts.map{|txout| wrap_txout(txout)}
end
# get all Models::TxOut matching given +hash160+
- def get_txouts_for_hash160(hash160, unconfirmed = false)
- addr = @db[:addr][:hash160 => hash160]
+ def get_txouts_for_hash160(hash160, type = :hash160, unconfirmed = false)
+ addr = @db[:addr][hash160: hash160, type: ADDRESS_TYPES.index(type)]
return [] unless addr
- txouts = @db[:addr_txout].where(:addr_id => addr[:id])
- .map{|t| @db[:txout][:id => t[:txout_id]] }
+ txouts = @db[:addr_txout].where(addr_id: addr[:id])
+ .map{|t| @db[:txout][id: t[:txout_id]] }
.map{|o| wrap_txout(o) }
unless unconfirmed
- txouts.select!{|o| @db[:blk][:id => o.get_tx.blk_id][:chain] == MAIN rescue false }
+ txouts.select!{|o| @db[:blk][id: o.get_tx.blk_id][:chain] == MAIN rescue false }
end
txouts
end
def get_txouts_for_name_hash(hash)
@@ -356,42 +404,48 @@
blk.bits = block[:bits]
blk.nonce = block[:nonce]
blk.aux_pow = Bitcoin::P::AuxPow.new(block[:aux_pow]) if block[:aux_pow]
- db[:blk_tx].filter(blk_id: block[:id]).join(:tx, id: :tx_id)
- .order(:idx).each {|tx| blk.tx << wrap_tx(tx, block[:id]) }
+ blk_tx = db[:blk_tx].filter(blk_id: block[:id]).join(:tx, id: :tx_id).order(:idx)
- blk.recalc_block_hash
+ # fetch inputs and outputs for all transactions in the block to avoid additional queries for each transaction
+ inputs = db[:txin].filter(:tx_id => blk_tx.map{ |tx| tx[:id] }).order(:tx_idx).map.group_by{ |txin| txin[:tx_id] }
+ outputs = db[:txout].filter(:tx_id => blk_tx.map{ |tx| tx[:id] }).order(:tx_idx).map.group_by{ |txout| txout[:tx_id] }
+
+ blk.tx = blk_tx.map { |tx| wrap_tx(tx, block[:id], inputs: inputs[tx[:id]], outputs: outputs[tx[:id]]) }
+
+ blk.hash = block[:hash].hth
blk
end
# wrap given +transaction+ into Models::Transaction
- def wrap_tx(transaction, block_id = nil)
+ def wrap_tx(transaction, block_id = nil, prefetched = {})
return nil unless transaction
block_id ||= @db[:blk_tx].join(:blk, id: :blk_id)
.where(tx_id: transaction[:id], chain: 0).first[:blk_id] rescue nil
data = {id: transaction[:id], blk_id: block_id, size: transaction[:tx_size], idx: transaction[:idx]}
tx = Bitcoin::Storage::Models::Tx.new(self, data)
- inputs = db[:txin].filter(:tx_id => transaction[:id]).order(:tx_idx)
+ inputs = prefetched[:inputs] || db[:txin].filter(:tx_id => transaction[:id]).order(:tx_idx)
inputs.each { |i| tx.add_in(wrap_txin(i)) }
- outputs = db[:txout].filter(:tx_id => transaction[:id]).order(:tx_idx)
+ outputs = prefetched[:outputs] || db[:txout].filter(:tx_id => transaction[:id]).order(:tx_idx)
outputs.each { |o| tx.add_out(wrap_txout(o)) }
tx.ver = transaction[:version]
tx.lock_time = transaction[:lock_time]
- tx.hash = tx.hash_from_payload(tx.to_payload)
+ tx.hash = transaction[:hash].hth
tx
end
# wrap given +input+ into Models::TxIn
def wrap_txin(input)
return nil unless input
- data = {:id => input[:id], :tx_id => input[:tx_id], :tx_idx => input[:tx_idx]}
+ data = { :id => input[:id], :tx_id => input[:tx_id], :tx_idx => input[:tx_idx],
+ :p2sh_type => input[:p2sh_type] ? SCRIPT_TYPES[input[:p2sh_type]] : nil }
txin = Bitcoin::Storage::Models::TxIn.new(self, data)
txin.prev_out = input[:prev_out]
txin.prev_out_index = input[:prev_out_index]
txin.script_sig_length = input[:script_sig].bytesize
txin.script_sig = input[:script_sig]
@@ -444,9 +498,53 @@
# total = 0
# txouts.each do |txout|
# tx = txout.get_tx
# total += txout.value
# end
+ end
+
+ protected
+
+ # Abstraction for doing many quick inserts.
+ #
+ # * +table+ - db table name
+ # * +data+ - a table of hashes with the same keys
+ # * +opts+
+ # ** return_ids - if true table of inserted rows ids will be returned
+ def fast_insert(table, data, opts={})
+ return [] if data.empty?
+ # For postgres we are using COPY which is much faster than separate INSERTs
+ if @db.adapter_scheme == :postgres
+
+ columns = data.first.keys
+ if opts[:return_ids]
+ ids = db.transaction do
+ # COPY does not return ids, so we set ids manually based on current sequence value
+ # We lock the table to avoid inserts that could happen in the middle of COPY
+ db.execute("LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE")
+ first_id = db.fetch("SELECT nextval('#{table}_id_seq') AS id").first[:id]
+
+ # Blobs need to be represented in the hex form (yes, we do hth on them earlier, could be improved
+ # \\x is the format of bytea as hex encoding in postgres
+ csv = data.map.with_index{|x,i| [first_id + i, columns.map{|c| x[c].kind_of?(Sequel::SQL::Blob) ? "\\x#{x[c].hth}" : x[c]}].join(',')}.join("\n")
+ db.copy_into(table, columns: [:id] + columns, format: :csv, data: csv)
+ last_id = first_id + data.size - 1
+
+ # Set sequence value to max id, last arg true means it will be incremented before next value
+ db.execute("SELECT setval('#{table}_id_seq', #{last_id}, true)")
+ (first_id..last_id).to_a # returned ids
+ end
+ else
+ csv = data.map{|x| columns.map{|c| x[c].kind_of?(Sequel::SQL::Blob) ? "\\x#{x[c].hth}" : x[c]}.join(',')}.join("\n")
+ @db.copy_into(table, format: :csv, columns: columns, data: csv)
+ end
+
+ else
+
+ # Life is simple when your are not optimizing ;)
+ @db[table].insert_multiple(data)
+
+ end
end
end
end