lib/eost/bigquery.rb in eost-0.1.7 vs lib/eost/bigquery.rb in eost-0.1.8

- old
+ new

@@ -1,100 +1,166 @@ # frozen_string_literal: true -require 'roo' -require 'google/cloud/bigquery' +require('roo') +require('google/cloud/bigquery') module Eost + BD = 'hernanirvaz.coins' DF = '%Y-%m-%d' DI = '%Y-%m-%d %H:%M:%S' # (see Bigquery) class Bigquery - # @return [Google::Cloud::Bigquery] API bigquery - attr_reader :apibq # @return [Roo::CSV] folha calculo a processar attr_reader :folha # @return [Hash<Symbol, Boolean>] opcoes trabalho com linhas attr_reader :linha + # @return [Google::Cloud::Bigquery] API bigquery + attr_reader :api # @return [Array] row folha calculo em processamento attr_reader :row # @return [Google::Cloud::Bigquery::QueryJob] job bigquery attr_reader :job - # @return (see sql_select) - attr_reader :sql + # @return (see sql) + attr_reader :sqr # @param [String] csv folha calculo para processar # @param [Hash<Symbol, Boolean>] ops opcoes trabalho com linhas # @option ops [Boolean] :e (false) apaga linha igual? # @option ops [Boolean] :m (false) apaga linhas existencia multipla? # @option ops [Boolean] :i (false) insere linha nova? - # @return [Bigquery] acesso folhas calculo bloks.io - # & correspondente bigquery dataset + # @return [Bigquery] acesso folhas calculo bloks.io & correspondente bigquery dataset def initialize(csv = '', ops = { e: false, m: false, i: false }) - # usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials - # @see https://cloud.google.com/bigquery/docs/authentication/getting-started - @apibq = Google::Cloud::Bigquery.new @folha = Roo::CSV.new(csv) if csv.size.positive? @linha = ops + + # usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials + # @see https://cloud.google.com/bigquery/docs/authentication/getting-started + @api = Google::Cloud::Bigquery.new end - # cria job bigquery & verifica execucao - # - # @param [String] sql a executar - # @return [Boolean] job ok? - def job_bigquery?(sql) - @job = apibq.query_job(sql) - @job.wait_until_done! - puts @job.error['message'] if @job.failed? - @job.failed? + # @return [Carteiras] API eosscan - processar transacoes + def transacoes + @transacoes ||= Carteiras.new( + { + wb: sql("select * from #{BD}.walletEos order by 1").map { |e| { ax: e[:weos], sl: e[:eos].to_d } }, + nt: sql("select blocknumber,iax from #{BD}.eostx order by 1") + }, + linha + ) end - # cria Data Manipulation Language (DML) job bigquery - # - # @param (see job_bigquery?) - # @return [Integer] numero linhas afetadas - def dml(sql) - job_bigquery?(sql) ? 0 : job.num_dml_affected_rows + # @return [Carteiras] API eosscan - processar carteiras & transacoes + def carteiras + transacoes end - # pesquisa existencia linha folha calculo no bigquery - # - # @return [Google::Cloud::Bigquery::Data] resultado do sql num array<hash> - def sql_select - # array.count = 0 ==> pode carregar esta linha - # array.count >= 1 ==> nao carregar esta linha - @sql = job_bigquery?('select * ' + sql_where) ? [{}, {}] : job.data + # insere transacoes novas na tabela eos + def processa + puts(format("%<n>2i LINHAS INSERIDAS #{BD}.eos", n: transacoes.novas.count.positive? ? eos_insert_api : 0)) end + # @return [String] campos da tabela eos no bigquery + def eos_fields + 'blocknumber,time,contract,action,acfrom,acto,amount,symbol,memo,data,dias' + end + # @return [String] parte sql para processamento linhas existentes def sql_where - "from hernanirvaz.coins.eos where blocknumber=#{row[0]}" + "from #{BD}.eos where blocknumber=#{row[0]}" end # @return [Integer] numero linhas inseridas - def sql_insert + def eos_insert_csv return 1 unless linha[:i] - dml('insert hernanirvaz.coins.eos(blocknumber,time,contract,' \ - 'action,acfrom,acto,amount,symbol,memo,data,dias) VALUES(' + - str_insert1) + dml("INSERT #{BD}.eos(#{eos_fields}) VALUES(#{eos_csv_val1})") end - # @return [String] campos insert da linha bigquery - def str_insert1 - "#{row[0]},'#{DateTime.parse(row[1]).strftime(DI)}','#{row[2]}'," + - str_insert2 + # @return [String] valores formatados para insert eos (parte1) + def eos_csv_val1 + "#{row[0]}," \ + "'#{Time.parse(row[1]).strftime(DI)}'," \ + "'#{row[2]}'," \ + "#{eos_csv_val2}" end - # @return [String] campos insert da linha bigquery - def str_insert2 - "'#{row[3]}','#{row[4]}','#{row[5]}',#{row[6].to_f}," \ - "'#{row[7]}','#{row[8]}','#{row[9]}',0)" + # @return [String] valores formatados para insert eos (parte2) + def eos_csv_val2 + "'#{row[3]}'," \ + "'#{row[4]}'," \ + "'#{row[5]}'," \ + "#{Float(row[6])}," \ + "'#{row[7]}'," \ + "'#{row[8]}'," \ + "'#{row[9]}',0" end - # @return [Integer] numero linhas apagadas - def sql_delete - dml('delete ' + sql_where) + # @return [Integer] numero linhas inseridas + def eos_insert_api + dml("INSERT #{BD}.eos(#{eos_fields}) VALUES#{transacoes.novas.map { |e| eos_api_val1(e) }.join(',')}") + end + + # @param [Hash] htx transacao ligadas a uma carteira - sem elementos irrelevantes + # @return [String] valores formatados para insert eos (parte1) + def eos_api_val1(htx) + "(#{Integer(htx['block_num'])}," \ + "DATETIME(TIMESTAMP('#{htx['block_time']}'))," \ + "'#{act(htx)['account']}'," \ + "'#{act(htx)['name']}'," \ + "'#{act_data(htx)['from']}'," \ + "'#{act_data(htx)['to']}'," \ + "#{eos_api_val2(htx)}" + end + + # @param [Hash] htx transacao ligadas a uma carteira - sem elementos irrelevantes + # @return [String] valores formatados para insert eos (parte2) + def eos_api_val2(htx) + "#{act_data(htx)['quantity'].to_d}," \ + "'#{act_data(htx)['quantity'][/[[:upper:]]+/]}'," \ + "'#{act_data(htx)['memo']}'," \ + "'#{act_data(htx)}'," \ + "#{Integer(linha[:h][String(htx['block_num'])] || 0)})" + end + + # @param [Hash] htx transacao normal + # @return [Hash] dados da acao + def act(htx) + htx['action_trace']['act'] + end + + # @param [Hash] htx transacao normal + # @return [Hash] dados da acao + def act_data(htx) + act(htx)['data'] + end + + # cria job bigquery & verifica execucao + # + # @param cmd (see sql) + # @return [Boolean] job ok? + def job?(cmd) + @job = api.query_job(cmd) + @job.wait_until_done! + puts(@job.error['message']) if @job.failed? + @job.failed? + end + + # cria Structured Query Language (SQL) job bigquery + # + # @param [String] cmd comando SQL a executar + # @param [Array<Hash>] red resultado quando SQL tem erro + # @return [Google::Cloud::Bigquery::Data] resultado do SQL + def sql(cmd, red = []) + @sqr = job?(cmd) ? red : job.data + end + + # cria Data Manipulation Language (DML) job bigquery + # + # @param cmd (see sql) + # @return [Integer] numero linhas afetadas + def dml(cmd) + job?(cmd) ? 0 : job.num_dml_affected_rows end end end