lib/etht/bigquery.rb in etht-0.1.4 vs lib/etht/bigquery.rb in etht-0.1.5

- old
+ new

@@ -1,103 +1,153 @@ # frozen_string_literal: true -require 'google/cloud/bigquery' +require('google/cloud/bigquery') +require('bigdecimal/util') # @author Hernani Rodrigues Vaz module Etht - DF = '%Y-%m-%d' - DI = '%Y-%m-%d %H:%M:%S' + BD = 'hernanirvaz.coins' - # (see Bigquery) + # classe para processar etherscan & bigquery class Bigquery # @return [Google::Cloud::Bigquery] API bigquery - attr_reader :apibq - # @return [Etherscan::Api] API etherscan - attr_reader :apies - - # @return [Array] row folha calculo em processamento - attr_reader :row + attr_reader :api # @return [Google::Cloud::Bigquery::QueryJob] job bigquery attr_reader :job - # @return (see sql_select) - attr_reader :sql + # @return [Thor::CoreExt::HashWithIndifferentAccess] opcoes trabalho + attr_reader :ops - # @param [String] csv folha calculo para processar - # @param [Hash<Symbol, Boolean>] ops opcoes trabalho com linhas - # @option ops [Boolean] :e (false) apaga linha igual? - # @option ops [Boolean] :m (false) apaga linhas existencia multipla? - # @option ops [Boolean] :i (false) insere linha nova? - # @return [Bigquery] acesso folhas calculo bloks.io & correspondente bigquery dataset - def initialize(ops = { e: false, m: false, i: false }) + # @param [Thor::CoreExt::HashWithIndifferentAccess] pop opcoes trabalho + # @option pop [Hash] :h ({}) configuracao ajuste reposicionamento temporal + # @option pop [Boolean] :v (false) mostra dados transacoes normais & token? + # @return [Bigquery] API bigquery & API etherscan + def initialize(pop) # usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials # @see https://cloud.google.com/bigquery/docs/authentication/getting-started - @apibq = Google::Cloud::Bigquery.new - @apies = Etht::Accounts.new - @linha = ops + @api = Google::Cloud::Bigquery.new + @ops = pop end - def processa_eth - contract_address = '0xfc325129a11fab241287e42a9f04a74f14077b77' - p apies.normal_transactions(contract_address) + # @return [Carteiras] API etherscan - processar transacoes normais e tokens + def carteiras + @carteiras ||= Carteiras.new( + { + wb: sql("select * from #{BD}.walletEth order by 2") + .map { |e| { id: e[:id], ax: e[:address], sl: e[:saldo].to_d.round(10) } }, + nt: sql("select blocknumber,iax from #{BD}.ethtx"), + nk: sql("select blocknumber,iax from #{BD}.ethkx") + }, + ops + ) end - # cria job bigquery & verifica execucao - # - # @param [String] sql a executar - # @return [Boolean] job ok? - def job_bigquery?(sql) - @job = apibq.query_job(sql) - @job.wait_until_done! - puts @job.error['message'] if @job.failed? - @job.failed? + # insere transacoes novas nas tabelas etht, ethk + def processa + puts(format("%<n>2i LINHAS INSERIDAS #{BD}.etht", n: carteiras.novaest.count.positive? ? dml(etht_ins) : 0)) + puts(format("%<n>2i LINHAS INSERIDAS #{BD}.ethk", n: carteiras.novaesk.count.positive? ? dml(ethk_ins) : 0)) end - # cria Data Manipulation Language (DML) job bigquery - # - # @param (see job_bigquery?) - # @return [Integer] numero linhas afetadas - def dml(sql) - job_bigquery?(sql) ? 0 : job.num_dml_affected_rows + # @return [String] comando insert SQL formatado bigquery.etht + def etht_ins + "insert #{BD}.etht(blocknumber,timestamp,txhash,nonce,blockhash,transactionindex,axfrom,axto,value,gas," \ + 'gasprice,iserror,txreceipt_status,input,contractaddress,cumulativegasused,gasused,confirmations,dias' \ + ") VALUES(#{carteiras.novaest.map { |e| etht_val1(e) }.join(',')})" end - # pesquisa existencia linha folha calculo no bigquery - # - # @return [Google::Cloud::Bigquery::Data] resultado do sql num array<hash> - def sql_select - # array.count = 0 ==> pode carregar esta linha - # array.count >= 1 ==> nao carregar esta linha - @sql = job_bigquery?('select ' + eos_fields + ' ' + sql_where) ? [{}, {}] : job.data + # @return [String] valores formatados bigquery.etht parte1 + def etht_val1(hes) + "#{Integer(hes['blockNumber'])}," \ + "#{Integer(hes['timeStamp'])}," \ + "'#{hes['hash']}'," \ + "#{Integer(hes['nonce'])}," \ + "'#{hes['blockHash']}'," \ + "#{Integer(hes['transactionIndex'])}," \ + "'#{hes['from']}'," \ + "'#{hes['to']}'," \ + "#{etht_val2(hes)}" end - # @return [String] parte sql para processamento linhas existentes - def sql_where - "from hernanirvaz.coins.eos where blocknumber=#{row[0]}" + # @return [String] valores formatados bigquery.etht parte2 + def etht_val2(hes) + "cast('#{hes['value']}' as numeric)," \ + "cast('#{hes['gas']}' as numeric)," \ + "cast('#{hes['gasPrice']}' as numeric)," \ + "#{Integer(hes['isError'])}," \ + "#{hes['txreceipt_status'].length.zero? ? 'null' : hes['txreceipt_status']}," \ + "#{hes['input'].length.zero? ? 'null' : "'#{hes['input']}'"}," \ + "#{etht_val3(hes)}" end - # @return [Integer] numero linhas inseridas - def sql_insert - return 1 unless linha[:i] + # @return [String] valores formatados bigquery.etht parte3 + def etht_val3(hes) + "#{hes['contractAddress'].length.zero? ? 'null' : "'#{hes['contractAddress']}'"}," \ + "0,cast('#{hes['gasUsed']}' as numeric),0," \ + "#{Integer(ops[:h][hes['blockNumber']] || 0)}" + end - dml('insert hernanirvaz.coins.eos(' + eos_fields + ') VALUES(' + str_insert1) + # @return [String] comando insert SQL formatado bigquery.ethk + def ethk_ins + "insert #{BD}.ethk(blocknumber,timestamp,txhash,nonce,blockhash,axfrom,contractaddress,axto,value,tokenname," \ + 'tokensymbol,tokendecimal,transactionindex,gas,gasprice,gasused,cumulativegasused,input,confirmations,dias' \ + ") VALUES(#{carteiras.novaesk.map { |e| ethk_val1(e) }.join(',')})" end - # @return [String] campos da tabela no bigquery - def eos_fields - 'blocknumber,time,contract,action,acfrom,acto,amount,symbol,memo,data,dias' + # @return [String] valores formatados bigquery.ethk parte1 + def ethk_val1(hes) + "#{Integer(hes['blockNumber'])}," \ + "#{Integer(hes['timeStamp'])}," \ + "'#{hes['hash']}'," \ + "#{Integer(hes['nonce'])}," \ + "'#{hes['blockHash']}'," \ + "'#{hes['from']}'," \ + "#{ethk_val2(hes)}" end - # @return [String] campos insert da linha bigquery - def str_insert1 - "#{row[0]},'#{DateTime.parse(row[1]).strftime(DI)}','#{row[2]}'," + str_insert2 + # @return [String] valores formatados bigquery.ethk parte2 + def ethk_val2(hes) + "#{hes['contractAddress'].length.zero? ? 'null' : "'#{hes['contractAddress']}'"}," \ + "'#{hes['to']}'," \ + "cast('#{hes['value']}' as numeric)," \ + "'#{hes['tokenName']}'," \ + "'#{hes['tokenSymbol']}'," \ + "#{Integer(hes['tokenDecimal'])}," \ + "#{Integer(hes['transactionIndex'])}," \ + "#{ethk_val3(hes)}" end - # @return [String] campos insert da linha bigquery - def str_insert2 - "'#{row[3]}','#{row[4]}','#{row[5]}',#{row[6].to_f},'#{row[7]}','#{row[8]}','#{row[9]}',0)" + # @return [String] valores formatados bigquery.ethk parte3 + def ethk_val3(hes) + "cast('#{hes['gas']}' as numeric)," \ + "cast('#{hes['gasPrice']}' as numeric)," \ + "cast('#{hes['gasUsed']}' as numeric),0," \ + "#{hes['input'].length.zero? ? 'null' : "'#{hes['input']}'"},0," \ + "#{Integer(ops[:h][hes['blockNumber']] || 0)}" end - # @return [Integer] numero linhas apagadas - def sql_delete - dml('delete ' + sql_where) + # cria job bigquery & verifica execucao + # + # @param [String] sql a executar + # @return [Boolean] job ok? + def job?(sql) + @job = api.query_job(sql) + @job.wait_until_done! + puts(@job.error['message']) if @job.failed? + @job.failed? + end + + # cria Data Manipulation Language (DML) job bigquery + # + # @param (see job?) + # @return [Integer] numero linhas afetadas + def dml(sql) + job?(sql) ? 0 : job.num_dml_affected_rows + end + + # cria Structured Query Language (SQL) job bigquery + # + # @param (see job?) + # @return [Array<Hash>] resultados do SQL + def sql(sql) + job?(sql) ? [] : job.data end end end