lib/eost/bigquery.rb in eost-0.1.7 vs lib/eost/bigquery.rb in eost-0.1.8
- old
+ new
@@ -1,100 +1,166 @@
# frozen_string_literal: true
-require 'roo'
-require 'google/cloud/bigquery'
+require('roo')
+require('google/cloud/bigquery')
module Eost
+ BD = 'hernanirvaz.coins'
DF = '%Y-%m-%d'
DI = '%Y-%m-%d %H:%M:%S'
# (see Bigquery)
class Bigquery
- # @return [Google::Cloud::Bigquery] API bigquery
- attr_reader :apibq
# @return [Roo::CSV] folha calculo a processar
attr_reader :folha
# @return [Hash<Symbol, Boolean>] opcoes trabalho com linhas
attr_reader :linha
+ # @return [Google::Cloud::Bigquery] API bigquery
+ attr_reader :api
# @return [Array] row folha calculo em processamento
attr_reader :row
# @return [Google::Cloud::Bigquery::QueryJob] job bigquery
attr_reader :job
- # @return (see sql_select)
- attr_reader :sql
+ # @return (see sql)
+ attr_reader :sqr
# @param [String] csv folha calculo para processar
# @param [Hash<Symbol, Boolean>] ops opcoes trabalho com linhas
# @option ops [Boolean] :e (false) apaga linha igual?
# @option ops [Boolean] :m (false) apaga linhas existencia multipla?
# @option ops [Boolean] :i (false) insere linha nova?
- # @return [Bigquery] acesso folhas calculo bloks.io
- # & correspondente bigquery dataset
+ # @return [Bigquery] acesso folhas calculo bloks.io & correspondente bigquery dataset
def initialize(csv = '', ops = { e: false, m: false, i: false })
- # usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials
- # @see https://cloud.google.com/bigquery/docs/authentication/getting-started
- @apibq = Google::Cloud::Bigquery.new
@folha = Roo::CSV.new(csv) if csv.size.positive?
@linha = ops
+
+ # usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials
+ # @see https://cloud.google.com/bigquery/docs/authentication/getting-started
+ @api = Google::Cloud::Bigquery.new
end
- # cria job bigquery & verifica execucao
- #
- # @param [String] sql a executar
- # @return [Boolean] job ok?
- def job_bigquery?(sql)
- @job = apibq.query_job(sql)
- @job.wait_until_done!
- puts @job.error['message'] if @job.failed?
- @job.failed?
+ # @return [Carteiras] API eosscan - processar transacoes
+ def transacoes
+ @transacoes ||= Carteiras.new(
+ {
+ wb: sql("select * from #{BD}.walletEos order by 1").map { |e| { ax: e[:weos], sl: e[:eos].to_d } },
+ nt: sql("select blocknumber,iax from #{BD}.eostx order by 1")
+ },
+ linha
+ )
end
- # cria Data Manipulation Language (DML) job bigquery
- #
- # @param (see job_bigquery?)
- # @return [Integer] numero linhas afetadas
- def dml(sql)
- job_bigquery?(sql) ? 0 : job.num_dml_affected_rows
+ # @return [Carteiras] API eosscan - processar carteiras & transacoes
+ def carteiras
+ transacoes
end
- # pesquisa existencia linha folha calculo no bigquery
- #
- # @return [Google::Cloud::Bigquery::Data] resultado do sql num array<hash>
- def sql_select
- # array.count = 0 ==> pode carregar esta linha
- # array.count >= 1 ==> nao carregar esta linha
- @sql = job_bigquery?('select * ' + sql_where) ? [{}, {}] : job.data
+ # insere transacoes novas na tabela eos
+ def processa
+ puts(format("%<n>2i LINHAS INSERIDAS #{BD}.eos", n: transacoes.novas.count.positive? ? eos_insert_api : 0))
end
+ # @return [String] campos da tabela eos no bigquery
+ def eos_fields
+ 'blocknumber,time,contract,action,acfrom,acto,amount,symbol,memo,data,dias'
+ end
+
# @return [String] parte sql para processamento linhas existentes
def sql_where
- "from hernanirvaz.coins.eos where blocknumber=#{row[0]}"
+ "from #{BD}.eos where blocknumber=#{row[0]}"
end
# @return [Integer] numero linhas inseridas
- def sql_insert
+ def eos_insert_csv
return 1 unless linha[:i]
- dml('insert hernanirvaz.coins.eos(blocknumber,time,contract,' \
- 'action,acfrom,acto,amount,symbol,memo,data,dias) VALUES(' +
- str_insert1)
+ dml("INSERT #{BD}.eos(#{eos_fields}) VALUES(#{eos_csv_val1})")
end
- # @return [String] campos insert da linha bigquery
- def str_insert1
- "#{row[0]},'#{DateTime.parse(row[1]).strftime(DI)}','#{row[2]}'," +
- str_insert2
+ # @return [String] valores formatados para insert eos (parte1)
+ def eos_csv_val1
+ "#{row[0]}," \
+ "'#{Time.parse(row[1]).strftime(DI)}'," \
+ "'#{row[2]}'," \
+ "#{eos_csv_val2}"
end
- # @return [String] campos insert da linha bigquery
- def str_insert2
- "'#{row[3]}','#{row[4]}','#{row[5]}',#{row[6].to_f}," \
- "'#{row[7]}','#{row[8]}','#{row[9]}',0)"
+ # @return [String] valores formatados para insert eos (parte2)
+ def eos_csv_val2
+ "'#{row[3]}'," \
+ "'#{row[4]}'," \
+ "'#{row[5]}'," \
+ "#{Float(row[6])}," \
+ "'#{row[7]}'," \
+ "'#{row[8]}'," \
+ "'#{row[9]}',0"
end
- # @return [Integer] numero linhas apagadas
- def sql_delete
- dml('delete ' + sql_where)
+ # @return [Integer] numero linhas inseridas
+ def eos_insert_api
+ dml("INSERT #{BD}.eos(#{eos_fields}) VALUES#{transacoes.novas.map { |e| eos_api_val1(e) }.join(',')}")
+ end
+
+ # @param [Hash] htx transacao ligadas a uma carteira - sem elementos irrelevantes
+ # @return [String] valores formatados para insert eos (parte1)
+ def eos_api_val1(htx)
+ "(#{Integer(htx['block_num'])}," \
+ "DATETIME(TIMESTAMP('#{htx['block_time']}'))," \
+ "'#{act(htx)['account']}'," \
+ "'#{act(htx)['name']}'," \
+ "'#{act_data(htx)['from']}'," \
+ "'#{act_data(htx)['to']}'," \
+ "#{eos_api_val2(htx)}"
+ end
+
+ # @param [Hash] htx transacao ligadas a uma carteira - sem elementos irrelevantes
+ # @return [String] valores formatados para insert eos (parte2)
+ def eos_api_val2(htx)
+ "#{act_data(htx)['quantity'].to_d}," \
+ "'#{act_data(htx)['quantity'][/[[:upper:]]+/]}'," \
+ "'#{act_data(htx)['memo']}'," \
+ "'#{act_data(htx)}'," \
+ "#{Integer(linha[:h][String(htx['block_num'])] || 0)})"
+ end
+
+ # @param [Hash] htx transacao normal
+ # @return [Hash] dados da acao
+ def act(htx)
+ htx['action_trace']['act']
+ end
+
+ # @param [Hash] htx transacao normal
+ # @return [Hash] dados da acao
+ def act_data(htx)
+ act(htx)['data']
+ end
+
+ # cria job bigquery & verifica execucao
+ #
+ # @param cmd (see sql)
+ # @return [Boolean] job ok?
+ def job?(cmd)
+ @job = api.query_job(cmd)
+ @job.wait_until_done!
+ puts(@job.error['message']) if @job.failed?
+ @job.failed?
+ end
+
+ # cria Structured Query Language (SQL) job bigquery
+ #
+ # @param [String] cmd comando SQL a executar
+ # @param [Array<Hash>] red resultado quando SQL tem erro
+ # @return [Google::Cloud::Bigquery::Data] resultado do SQL
+ def sql(cmd, red = [])
+ @sqr = job?(cmd) ? red : job.data
+ end
+
+ # cria Data Manipulation Language (DML) job bigquery
+ #
+ # @param cmd (see sql)
+ # @return [Integer] numero linhas afetadas
+ def dml(cmd)
+ job?(cmd) ? 0 : job.num_dml_affected_rows
end
end
end