lib/etht/bigquery.rb in etht-0.1.4 vs lib/etht/bigquery.rb in etht-0.1.5
- old
+ new
@@ -1,103 +1,153 @@
# frozen_string_literal: true
-require 'google/cloud/bigquery'
+require('google/cloud/bigquery')
+require('bigdecimal/util')
# @author Hernani Rodrigues Vaz
module Etht
- DF = '%Y-%m-%d'
- DI = '%Y-%m-%d %H:%M:%S'
+ BD = 'hernanirvaz.coins'
- # (see Bigquery)
+ # classe para processar etherscan & bigquery
class Bigquery
# @return [Google::Cloud::Bigquery] API bigquery
- attr_reader :apibq
- # @return [Etherscan::Api] API etherscan
- attr_reader :apies
-
- # @return [Array] row folha calculo em processamento
- attr_reader :row
+ attr_reader :api
# @return [Google::Cloud::Bigquery::QueryJob] job bigquery
attr_reader :job
- # @return (see sql_select)
- attr_reader :sql
+ # @return [Thor::CoreExt::HashWithIndifferentAccess] opcoes trabalho
+ attr_reader :ops
- # @param [String] csv folha calculo para processar
- # @param [Hash<Symbol, Boolean>] ops opcoes trabalho com linhas
- # @option ops [Boolean] :e (false) apaga linha igual?
- # @option ops [Boolean] :m (false) apaga linhas existencia multipla?
- # @option ops [Boolean] :i (false) insere linha nova?
- # @return [Bigquery] acesso folhas calculo bloks.io & correspondente bigquery dataset
- def initialize(ops = { e: false, m: false, i: false })
+ # @param [Thor::CoreExt::HashWithIndifferentAccess] pop opcoes trabalho
+ # @option pop [Hash] :h ({}) configuracao ajuste reposicionamento temporal
+ # @option pop [Boolean] :v (false) mostra dados transacoes normais & token?
+ # @return [Bigquery] API bigquery & API etherscan
+ def initialize(pop)
# usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials
# @see https://cloud.google.com/bigquery/docs/authentication/getting-started
- @apibq = Google::Cloud::Bigquery.new
- @apies = Etht::Accounts.new
- @linha = ops
+ @api = Google::Cloud::Bigquery.new
+ @ops = pop
end
- def processa_eth
- contract_address = '0xfc325129a11fab241287e42a9f04a74f14077b77'
- p apies.normal_transactions(contract_address)
+ # @return [Carteiras] API etherscan - processar transacoes normais e tokens
+ def carteiras
+ @carteiras ||= Carteiras.new(
+ {
+ wb: sql("select * from #{BD}.walletEth order by 2")
+ .map { |e| { id: e[:id], ax: e[:address], sl: e[:saldo].to_d.round(10) } },
+ nt: sql("select blocknumber,iax from #{BD}.ethtx"),
+ nk: sql("select blocknumber,iax from #{BD}.ethkx")
+ },
+ ops
+ )
end
- # cria job bigquery & verifica execucao
- #
- # @param [String] sql a executar
- # @return [Boolean] job ok?
- def job_bigquery?(sql)
- @job = apibq.query_job(sql)
- @job.wait_until_done!
- puts @job.error['message'] if @job.failed?
- @job.failed?
+ # insere transacoes novas nas tabelas etht, ethk
+ def processa
+ puts(format("%<n>2i LINHAS INSERIDAS #{BD}.etht", n: carteiras.novaest.count.positive? ? dml(etht_ins) : 0))
+ puts(format("%<n>2i LINHAS INSERIDAS #{BD}.ethk", n: carteiras.novaesk.count.positive? ? dml(ethk_ins) : 0))
end
- # cria Data Manipulation Language (DML) job bigquery
- #
- # @param (see job_bigquery?)
- # @return [Integer] numero linhas afetadas
- def dml(sql)
- job_bigquery?(sql) ? 0 : job.num_dml_affected_rows
+ # @return [String] comando insert SQL formatado bigquery.etht
+ def etht_ins
+ "insert #{BD}.etht(blocknumber,timestamp,txhash,nonce,blockhash,transactionindex,axfrom,axto,value,gas," \
+ 'gasprice,iserror,txreceipt_status,input,contractaddress,cumulativegasused,gasused,confirmations,dias' \
+ ") VALUES(#{carteiras.novaest.map { |e| etht_val1(e) }.join(',')})"
end
- # pesquisa existencia linha folha calculo no bigquery
- #
- # @return [Google::Cloud::Bigquery::Data] resultado do sql num array<hash>
- def sql_select
- # array.count = 0 ==> pode carregar esta linha
- # array.count >= 1 ==> nao carregar esta linha
- @sql = job_bigquery?('select ' + eos_fields + ' ' + sql_where) ? [{}, {}] : job.data
+ # @return [String] valores formatados bigquery.etht parte1
+ def etht_val1(hes)
+ "#{Integer(hes['blockNumber'])}," \
+ "#{Integer(hes['timeStamp'])}," \
+ "'#{hes['hash']}'," \
+ "#{Integer(hes['nonce'])}," \
+ "'#{hes['blockHash']}'," \
+ "#{Integer(hes['transactionIndex'])}," \
+ "'#{hes['from']}'," \
+ "'#{hes['to']}'," \
+ "#{etht_val2(hes)}"
end
- # @return [String] parte sql para processamento linhas existentes
- def sql_where
- "from hernanirvaz.coins.eos where blocknumber=#{row[0]}"
+ # @return [String] valores formatados bigquery.etht parte2
+ def etht_val2(hes)
+ "cast('#{hes['value']}' as numeric)," \
+ "cast('#{hes['gas']}' as numeric)," \
+ "cast('#{hes['gasPrice']}' as numeric)," \
+ "#{Integer(hes['isError'])}," \
+ "#{hes['txreceipt_status'].length.zero? ? 'null' : hes['txreceipt_status']}," \
+ "#{hes['input'].length.zero? ? 'null' : "'#{hes['input']}'"}," \
+ "#{etht_val3(hes)}"
end
- # @return [Integer] numero linhas inseridas
- def sql_insert
- return 1 unless linha[:i]
+ # @return [String] valores formatados bigquery.etht parte3
+ def etht_val3(hes)
+ "#{hes['contractAddress'].length.zero? ? 'null' : "'#{hes['contractAddress']}'"}," \
+ "0,cast('#{hes['gasUsed']}' as numeric),0," \
+ "#{Integer(ops[:h][hes['blockNumber']] || 0)}"
+ end
- dml('insert hernanirvaz.coins.eos(' + eos_fields + ') VALUES(' + str_insert1)
+ # @return [String] comando insert SQL formatado bigquery.ethk
+ def ethk_ins
+ "insert #{BD}.ethk(blocknumber,timestamp,txhash,nonce,blockhash,axfrom,contractaddress,axto,value,tokenname," \
+ 'tokensymbol,tokendecimal,transactionindex,gas,gasprice,gasused,cumulativegasused,input,confirmations,dias' \
+ ") VALUES(#{carteiras.novaesk.map { |e| ethk_val1(e) }.join(',')})"
end
- # @return [String] campos da tabela no bigquery
- def eos_fields
- 'blocknumber,time,contract,action,acfrom,acto,amount,symbol,memo,data,dias'
+ # @return [String] valores formatados bigquery.ethk parte1
+ def ethk_val1(hes)
+ "#{Integer(hes['blockNumber'])}," \
+ "#{Integer(hes['timeStamp'])}," \
+ "'#{hes['hash']}'," \
+ "#{Integer(hes['nonce'])}," \
+ "'#{hes['blockHash']}'," \
+ "'#{hes['from']}'," \
+ "#{ethk_val2(hes)}"
end
- # @return [String] campos insert da linha bigquery
- def str_insert1
- "#{row[0]},'#{DateTime.parse(row[1]).strftime(DI)}','#{row[2]}'," + str_insert2
+ # @return [String] valores formatados bigquery.ethk parte2
+ def ethk_val2(hes)
+ "#{hes['contractAddress'].length.zero? ? 'null' : "'#{hes['contractAddress']}'"}," \
+ "'#{hes['to']}'," \
+ "cast('#{hes['value']}' as numeric)," \
+ "'#{hes['tokenName']}'," \
+ "'#{hes['tokenSymbol']}'," \
+ "#{Integer(hes['tokenDecimal'])}," \
+ "#{Integer(hes['transactionIndex'])}," \
+ "#{ethk_val3(hes)}"
end
- # @return [String] campos insert da linha bigquery
- def str_insert2
- "'#{row[3]}','#{row[4]}','#{row[5]}',#{row[6].to_f},'#{row[7]}','#{row[8]}','#{row[9]}',0)"
+ # @return [String] valores formatados bigquery.ethk parte3
+ def ethk_val3(hes)
+ "cast('#{hes['gas']}' as numeric)," \
+ "cast('#{hes['gasPrice']}' as numeric)," \
+ "cast('#{hes['gasUsed']}' as numeric),0," \
+ "#{hes['input'].length.zero? ? 'null' : "'#{hes['input']}'"},0," \
+ "#{Integer(ops[:h][hes['blockNumber']] || 0)}"
end
- # @return [Integer] numero linhas apagadas
- def sql_delete
- dml('delete ' + sql_where)
+ # cria job bigquery & verifica execucao
+ #
+ # @param [String] sql a executar
+ # @return [Boolean] job ok?
+ def job?(sql)
+ @job = api.query_job(sql)
+ @job.wait_until_done!
+ puts(@job.error['message']) if @job.failed?
+ @job.failed?
+ end
+
+ # cria Data Manipulation Language (DML) job bigquery
+ #
+ # @param (see job?)
+ # @return [Integer] numero linhas afetadas
+ def dml(sql)
+ job?(sql) ? 0 : job.num_dml_affected_rows
+ end
+
+ # cria Structured Query Language (SQL) job bigquery
+ #
+ # @param (see job?)
+ # @return [Array<Hash>] resultados do SQL
+ def sql(sql)
+ job?(sql) ? [] : job.data
end
end
end