lib/bitcoin/storage/sequel/sequel_store.rb in bitcoin-ruby-0.0.5 vs lib/bitcoin/storage/sequel/sequel_store.rb in bitcoin-ruby-0.0.6

- old
+ new

@@ -5,17 +5,26 @@ module Bitcoin::Storage::Backends # Storage backend using Sequel to connect to arbitrary SQL databases. # Inherits from StoreBase and implements its interface. - class SequelStore < StoreBase + class SequelStore < SequelStoreBase # sequel database connection attr_accessor :db - DEFAULT_CONFIG = { mode: :full, cache_head: false } + DEFAULT_CONFIG = { + # TODO + mode: :full, + # cache head block. only the instance that is updating the head should do this. + cache_head: false, + + # store an index of tx.nhash values + index_nhash: false + } + # create sequel store with given +config+ def initialize config, *args super config, *args end @@ -54,37 +63,42 @@ else block_id = @db[:blk].insert(attrs) blk_tx, new_tx, addrs, names = [], [], [], [] # store tx + existing_tx = Hash[*@db[:tx].filter(hash: blk.tx.map {|tx| tx.hash.htb.blob }).map { |tx| [tx[:hash].hth, tx[:id]] }.flatten] blk.tx.each.with_index do |tx, idx| - existing = @db[:tx][hash: tx.hash.htb.blob] - existing ? blk_tx[idx] = existing[:id] : new_tx << [tx, idx] + existing = existing_tx[tx.hash] + existing ? blk_tx[idx] = existing : new_tx << [tx, idx] end - new_tx_ids = @db[:tx].insert_multiple(new_tx.map {|tx, _| tx_data(tx) }) + + new_tx_ids = fast_insert(:tx, new_tx.map {|tx, _| tx_data(tx) }, return_ids: true) new_tx_ids.each.with_index {|tx_id, idx| blk_tx[new_tx[idx][1]] = tx_id } - @db[:blk_tx].insert_multiple(blk_tx.map.with_index {|id, idx| - { blk_id: block_id, tx_id: id, idx: idx } }) + fast_insert(:blk_tx, blk_tx.map.with_index {|id, idx| { blk_id: block_id, tx_id: id, idx: idx } }) # store txins - txin_ids = @db[:txin].insert_multiple(new_tx.map.with_index {|tx, tx_idx| + fast_insert(:txin, new_tx.map.with_index {|tx, tx_idx| tx, _ = *tx tx.in.map.with_index {|txin, txin_idx| - txin_data(new_tx_ids[tx_idx], txin, txin_idx) } }.flatten) + p2sh_type = nil + if @config[:index_p2sh_type] && !txin.coinbase? && (script = tx.scripts[txin_idx]) && script.is_p2sh? + p2sh_type = Bitcoin::Script.new(script.inner_p2sh_script).type + end + txin_data(new_tx_ids[tx_idx], txin, txin_idx, p2sh_type) } }.flatten) # store txouts txout_i = 0 - txout_ids = @db[:txout].insert_multiple(new_tx.map.with_index {|tx, tx_idx| + txout_ids = fast_insert(:txout, new_tx.map.with_index {|tx, tx_idx| tx, _ = *tx tx.out.map.with_index {|txout, txout_idx| - script_type, a, n = *parse_script(txout, txout_i, tx.hash) + script_type, a, n = *parse_script(txout, txout_i, tx.hash, txout_idx) addrs += a; names += n; txout_i += 1 - txout_data(new_tx_ids[tx_idx], txout, txout_idx, script_type) } }.flatten) + txout_data(new_tx_ids[tx_idx], txout, txout_idx, script_type) } }.flatten, return_ids: true) # store addrs - persist_addrs addrs.map {|i, h| [txout_ids[i], h]} + persist_addrs addrs.map {|i, addr| [txout_ids[i], addr]} names.each {|i, script| store_name(script, txout_ids[i]) } end @head = wrap_block(attrs.merge(id: block_id)) if chain == MAIN @db[:blk].where(:prev_hash => blk.hash.htb.blob, :chain => ORPHAN).each do |b| log.debug { "connecting orphan #{b[:hash].hth}" } @@ -99,69 +113,74 @@ end def reorg new_side, new_main @db.transaction do @db[:blk].where(hash: new_side.map {|h| h.htb.blob }).update(chain: SIDE) - new_main.each {|b| get_block(b).validator(self).validate(raise_errors: true) } unless @config[:skip_validation] - @db[:blk].where(hash: new_main.map {|h| h.htb.blob }).update(chain: MAIN) - end - end - - # parse script and collect address/txout mappings to index - def parse_script txout, i, tx_hash = "" - addrs, names = [], [] - - script = Bitcoin::Script.new(txout.pk_script) rescue nil - if script - if script.is_hash160? || script.is_pubkey? - addrs << [i, script.get_hash160] - elsif script.is_multisig? - script.get_multisig_pubkeys.map do |pubkey| - addrs << [i, Bitcoin.hash160(pubkey.unpack("H*")[0])] + new_main.each do |block_hash| + unless @config[:skip_validation] + get_block(block_hash).validator(self).validate(raise_errors: true) end - elsif Bitcoin.namecoin? && script.is_namecoin? - addrs << [i, script.get_hash160] - names << [i, script] - else - log.info { "Unknown script type in #{tx_hash}:#{i}" } - log.debug { script.to_string } + @db[:blk].where(hash: block_hash.htb.blob).update(chain: MAIN) end - script_type = SCRIPT_TYPES.index(script.type) - else - log.error { "Error parsing script #{tx_hash}:#{i}" } - script_type = SCRIPT_TYPES.index(:unknown) end - [script_type, addrs, names] end # bulk-store addresses and txout mappings def persist_addrs addrs addr_txouts, new_addrs = [], [] - addrs.group_by {|_, a| a }.each do |hash160, txouts| - if existing = @db[:addr][:hash160 => hash160] - txouts.each {|id, _| addr_txouts << [existing[:id], id] } + + # find addresses that are already there + existing_addr = {} + addrs.each do |i, addr| + hash160 = Bitcoin.hash160_from_address(addr) + type = Bitcoin.address_type(addr) + if existing = @db[:addr][hash160: hash160, type: ADDRESS_TYPES.index(type)] + existing_addr[[hash160, type]] = existing[:id] + end + end + + # iterate over all txouts, grouped by hash160 + addrs.group_by {|_, a| a }.each do |addr, txouts| + hash160 = Bitcoin.hash160_from_address(addr) + type = Bitcoin.address_type(addr) + + if existing_id = existing_addr[[hash160, type]] + # link each txout to existing address + txouts.each {|id, _| addr_txouts << [existing_id, id] } else - new_addrs << [hash160, txouts.map {|id, _| id }] + # collect new address/txout mapping + new_addrs << [[hash160, type], txouts.map {|id, _| id }] end end - new_addr_ids = @db[:addr].insert_multiple(new_addrs.map {|hash160, txout_id| - { hash160: hash160 } }) + + # insert all new addresses + new_addr_ids = fast_insert(:addr, new_addrs.map {|hash160_and_type, txout_id| + hash160, type = *hash160_and_type + { hash160: hash160, type: ADDRESS_TYPES.index(type) } + }, return_ids: true) + + + # link each new txout to the new addresses new_addr_ids.each.with_index do |addr_id, idx| new_addrs[idx][1].each do |txout_id| addr_txouts << [addr_id, txout_id] end end - @db[:addr_txout].insert_multiple(addr_txouts.map {|addr_id, txout_id| - { addr_id: addr_id, txout_id: txout_id }}) + + # insert addr/txout links + fast_insert(:addr_txout, addr_txouts.map {|addr_id, txout_id| { addr_id: addr_id, txout_id: txout_id }}) end # prepare transaction data for storage def tx_data tx - { hash: tx.hash.htb.blob, + data = { + hash: tx.hash.htb.blob, version: tx.ver, lock_time: tx.lock_time, coinbase: tx.in.size == 1 && tx.in[0].coinbase?, tx_size: tx.payload.bytesize } + data[:nhash] = tx.nhash.htb.blob if @config[:index_nhash] + data end # store transaction +tx+ def store_tx(tx, validate = true) @log.debug { "Storing tx #{tx.hash} (#{tx.to_payload.bytesize} bytes)" } @@ -175,21 +194,25 @@ tx_id end end # prepare txin data for storage - def txin_data tx_id, txin, idx - { tx_id: tx_id, tx_idx: idx, + def txin_data tx_id, txin, idx, p2sh_type = nil + data = { + tx_id: tx_id, tx_idx: idx, script_sig: txin.script_sig.blob, prev_out: txin.prev_out.blob, prev_out_index: txin.prev_out_index, - sequence: txin.sequence.unpack("V")[0] } + sequence: txin.sequence.unpack("V")[0], + } + data[:p2sh_type] = SCRIPT_TYPES.index(p2sh_type) if @config[:index_p2sh_type] + data end # store input +txin+ - def store_txin(tx_id, txin, idx) - @db[:txin].insert(txin_data(tx_id, txin, idx)) + def store_txin(tx_id, txin, idx, p2sh_type = nil) + @db[:txin].insert(txin_data(tx_id, txin, idx, p2sh_type)) end # prepare txout data for storage def txout_data tx_id, txout, idx, script_type { tx_id: tx_id, tx_idx: idx, @@ -197,11 +220,11 @@ value: txout.value, type: script_type } end # store output +txout+ def store_txout(tx_id, txout, idx, tx_hash = "") - script_type, addrs, names = *parse_script(txout, idx, tx_hash) + script_type, addrs, names = *parse_script(txout, idx, tx_hash, idx) txout_id = @db[:txout].insert(txout_data(tx_id, txout, idx, script_type)) persist_addrs addrs.map {|i, h| [txout_id, h] } names.each {|i, script| store_name(script, txout_id) } txout_id end @@ -216,13 +239,13 @@ tx.out.each {|o| @db[:txout].where(:id => o.id).delete } @db[:tx].where(:id => tx.id).delete end end - # check if block +blk_hash+ exists + # check if block +blk_hash+ exists in the main chain def has_block(blk_hash) - !!@db[:blk].where(:hash => blk_hash.htb.blob).get(1) + !!@db[:blk].where(:hash => blk_hash.htb.blob, :chain => 0).get(1) end # check if transaction +tx_hash+ exists def has_tx(tx_hash) !!@db[:tx].where(:hash => tx_hash.htb.blob).get(1) @@ -275,15 +298,35 @@ # get block by given +id+ def get_block_by_id(block_id) wrap_block(@db[:blk][:id => block_id]) end + # get block id in the main chain by given +tx_id+ + def get_block_id_for_tx_id(tx_id) + @db[:blk_tx].join(:blk, id: :blk_id) + .where(tx_id: tx_id, chain: MAIN).first[:blk_id] rescue nil + end + # get transaction for given +tx_hash+ def get_tx(tx_hash) wrap_tx(@db[:tx][:hash => tx_hash.htb.blob]) end + # get array of txes with given +tx_hashes+ + def get_txs(tx_hashes) + txs = db[:tx].filter(hash: tx_hashes.map{|h| h.htb.blob}) + txs_ids = txs.map {|tx| tx[:id]} + return [] if txs_ids.empty? + + # we fetch all needed block ids, inputs and outputs to avoid doing number of queries propertional to number of transactions + block_ids = Hash[*db[:blk_tx].join(:blk, id: :blk_id).filter(tx_id: txs_ids, chain: 0).map {|b| [b[:tx_id], b[:blk_id]] }.flatten] + inputs = db[:txin].filter(:tx_id => txs_ids).order(:tx_idx).map.group_by{ |txin| txin[:tx_id] } + outputs = db[:txout].filter(:tx_id => txs_ids).order(:tx_idx).map.group_by{ |txout| txout[:tx_id] } + + txs.map {|tx| wrap_tx(tx, block_ids[tx[:id]], inputs: inputs[tx[:id]], outputs: outputs[tx[:id]]) } + end + # get transaction by given +tx_id+ def get_tx_by_id(tx_id) wrap_tx(@db[:tx][:id => tx_id]) end @@ -292,10 +335,15 @@ def get_txin_for_txout(tx_hash, txout_idx) tx_hash = tx_hash.htb_reverse.blob wrap_txin(@db[:txin][:prev_out => tx_hash, :prev_out_index => txout_idx]) end + # optimized version of Storage#get_txins_for_txouts + def get_txins_for_txouts(txouts) + @db[:txin].filter([:prev_out, :prev_out_index] => txouts.map{|tx_hash, tx_idx| [tx_hash.htb_reverse.blob, tx_idx]}).map{|i| wrap_txin(i)} + end + def get_txout_by_id(txout_id) wrap_txout(@db[:txout][:id => txout_id]) end # get corresponding Models::TxOut for +txin+ @@ -310,18 +358,18 @@ txouts = @db[:txout].filter(:pk_script => script.blob).order(:id) txouts.map{|txout| wrap_txout(txout)} end # get all Models::TxOut matching given +hash160+ - def get_txouts_for_hash160(hash160, unconfirmed = false) - addr = @db[:addr][:hash160 => hash160] + def get_txouts_for_hash160(hash160, type = :hash160, unconfirmed = false) + addr = @db[:addr][hash160: hash160, type: ADDRESS_TYPES.index(type)] return [] unless addr - txouts = @db[:addr_txout].where(:addr_id => addr[:id]) - .map{|t| @db[:txout][:id => t[:txout_id]] } + txouts = @db[:addr_txout].where(addr_id: addr[:id]) + .map{|t| @db[:txout][id: t[:txout_id]] } .map{|o| wrap_txout(o) } unless unconfirmed - txouts.select!{|o| @db[:blk][:id => o.get_tx.blk_id][:chain] == MAIN rescue false } + txouts.select!{|o| @db[:blk][id: o.get_tx.blk_id][:chain] == MAIN rescue false } end txouts end def get_txouts_for_name_hash(hash) @@ -356,42 +404,48 @@ blk.bits = block[:bits] blk.nonce = block[:nonce] blk.aux_pow = Bitcoin::P::AuxPow.new(block[:aux_pow]) if block[:aux_pow] - db[:blk_tx].filter(blk_id: block[:id]).join(:tx, id: :tx_id) - .order(:idx).each {|tx| blk.tx << wrap_tx(tx, block[:id]) } + blk_tx = db[:blk_tx].filter(blk_id: block[:id]).join(:tx, id: :tx_id).order(:idx) - blk.recalc_block_hash + # fetch inputs and outputs for all transactions in the block to avoid additional queries for each transaction + inputs = db[:txin].filter(:tx_id => blk_tx.map{ |tx| tx[:id] }).order(:tx_idx).map.group_by{ |txin| txin[:tx_id] } + outputs = db[:txout].filter(:tx_id => blk_tx.map{ |tx| tx[:id] }).order(:tx_idx).map.group_by{ |txout| txout[:tx_id] } + + blk.tx = blk_tx.map { |tx| wrap_tx(tx, block[:id], inputs: inputs[tx[:id]], outputs: outputs[tx[:id]]) } + + blk.hash = block[:hash].hth blk end # wrap given +transaction+ into Models::Transaction - def wrap_tx(transaction, block_id = nil) + def wrap_tx(transaction, block_id = nil, prefetched = {}) return nil unless transaction block_id ||= @db[:blk_tx].join(:blk, id: :blk_id) .where(tx_id: transaction[:id], chain: 0).first[:blk_id] rescue nil data = {id: transaction[:id], blk_id: block_id, size: transaction[:tx_size], idx: transaction[:idx]} tx = Bitcoin::Storage::Models::Tx.new(self, data) - inputs = db[:txin].filter(:tx_id => transaction[:id]).order(:tx_idx) + inputs = prefetched[:inputs] || db[:txin].filter(:tx_id => transaction[:id]).order(:tx_idx) inputs.each { |i| tx.add_in(wrap_txin(i)) } - outputs = db[:txout].filter(:tx_id => transaction[:id]).order(:tx_idx) + outputs = prefetched[:outputs] || db[:txout].filter(:tx_id => transaction[:id]).order(:tx_idx) outputs.each { |o| tx.add_out(wrap_txout(o)) } tx.ver = transaction[:version] tx.lock_time = transaction[:lock_time] - tx.hash = tx.hash_from_payload(tx.to_payload) + tx.hash = transaction[:hash].hth tx end # wrap given +input+ into Models::TxIn def wrap_txin(input) return nil unless input - data = {:id => input[:id], :tx_id => input[:tx_id], :tx_idx => input[:tx_idx]} + data = { :id => input[:id], :tx_id => input[:tx_id], :tx_idx => input[:tx_idx], + :p2sh_type => input[:p2sh_type] ? SCRIPT_TYPES[input[:p2sh_type]] : nil } txin = Bitcoin::Storage::Models::TxIn.new(self, data) txin.prev_out = input[:prev_out] txin.prev_out_index = input[:prev_out_index] txin.script_sig_length = input[:script_sig].bytesize txin.script_sig = input[:script_sig] @@ -444,9 +498,53 @@ # total = 0 # txouts.each do |txout| # tx = txout.get_tx # total += txout.value # end + end + + protected + + # Abstraction for doing many quick inserts. + # + # * +table+ - db table name + # * +data+ - a table of hashes with the same keys + # * +opts+ + # ** return_ids - if true table of inserted rows ids will be returned + def fast_insert(table, data, opts={}) + return [] if data.empty? + # For postgres we are using COPY which is much faster than separate INSERTs + if @db.adapter_scheme == :postgres + + columns = data.first.keys + if opts[:return_ids] + ids = db.transaction do + # COPY does not return ids, so we set ids manually based on current sequence value + # We lock the table to avoid inserts that could happen in the middle of COPY + db.execute("LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE") + first_id = db.fetch("SELECT nextval('#{table}_id_seq') AS id").first[:id] + + # Blobs need to be represented in the hex form (yes, we do hth on them earlier, could be improved + # \\x is the format of bytea as hex encoding in postgres + csv = data.map.with_index{|x,i| [first_id + i, columns.map{|c| x[c].kind_of?(Sequel::SQL::Blob) ? "\\x#{x[c].hth}" : x[c]}].join(',')}.join("\n") + db.copy_into(table, columns: [:id] + columns, format: :csv, data: csv) + last_id = first_id + data.size - 1 + + # Set sequence value to max id, last arg true means it will be incremented before next value + db.execute("SELECT setval('#{table}_id_seq', #{last_id}, true)") + (first_id..last_id).to_a # returned ids + end + else + csv = data.map{|x| columns.map{|c| x[c].kind_of?(Sequel::SQL::Blob) ? "\\x#{x[c].hth}" : x[c]}.join(',')}.join("\n") + @db.copy_into(table, format: :csv, columns: columns, data: csv) + end + + else + + # Life is simple when your are not optimizing ;) + @db[table].insert_multiple(data) + + end end end end