# frozen_string_literal: true require('google/cloud/bigquery') require('bigdecimal/util') # @author Hernani Rodrigues Vaz module Krkt BD = 'hernanirvaz.coins' # classe para processar kraken & bigquery class Bigquery # @return [Google::Cloud::Bigquery] API bigquery attr_reader :api # @return [Google::Cloud::Bigquery::QueryJob] job bigquery attr_reader :job # @return [Thor::CoreExt::HashWithIndifferentAccess] opcoes trabalho attr_reader :ops # @return (see sql) attr_reader :sqr # @param [Thor::CoreExt::HashWithIndifferentAccess] pop opcoes trabalho # @option pop [Hash] :h ({}) configuracao ajuste reposicionamento temporal # @option pop [Boolean] :v (false) mostra transacoes normais & ledger? # @option pop [Boolean] :t (false) mostra transacoes todas ou somente novas? # @return [Bigquery] API bigquery & API kraken def initialize(pop) # usa env GOOGLE_APPLICATION_CREDENTIALS para obter credentials # @see https://cloud.google.com/bigquery/docs/authentication/getting-started @api = Google::Cloud::Bigquery.new @ops = pop end # @return [Kraken] API kraken - processar transacoes normais e ledgers def transacoes @transacoes ||= Kraken.new( { sl: sql("select * from #{BD}.ussl")[0], nt: sql("select * from #{BD}.ustx order by time,txid"), nl: sql("select * from #{BD}.uslx order by time,txid") }, ops ) end # insere transacoes novas nas tabelas ust (trades), usl (ledger) def processa puts(format("%2i TRADES INSERIDAS #{BD}.ust", n: transacoes.trades.count.positive? ? dml(ust_ins) : 0)) puts(format("%2i LEDGER INSERIDAS #{BD}.usl", n: transacoes.ledger.count.positive? ? dml(usl_ins) : 0)) end # @return [String] comando insert SQL formatado ust (trades) def ust_ins "insert #{BD}.ust(txid,ordertxid,pair,time,type,ordertype,price,cost,fee,vol,margin,misc,ledgers,dias" \ ") VALUES#{transacoes.trades.map { |k, v| ust_val1(k, v) }.join(',')}" end # @return [String] valores formatados ust (trades parte1) def ust_val1(idx, hes) "('#{idx}'," \ "'#{hes['ordertxid']}'," \ "'#{hes['pair']}'," \ "PARSE_DATETIME('%s', '#{String(hes['time'].round)}')," \ "'#{hes['type']}'," \ "'#{hes['ordertype']}'," \ "cast(#{hes['price']} as numeric)," \ "cast(#{hes['cost']} as numeric)," \ "cast(#{hes['fee']} as numeric)," \ "#{ust_val2(idx, hes)}" end # @return [String] valores formatados ust (trades parte2) def ust_val2(idx, hes) "cast(#{hes['vol']} as numeric)," \ "cast(#{hes['margin']} as numeric)," \ "#{hes['misc'].length.zero? ? 'null' : "'#{hes['misc']}'"}," \ "'#{transacoes.ledger.select { |_, v| v['refid'] == idx }.keys.join(',') || ''}'," \ "#{Integer(ops[:h][idx] || 0)})" end # @return [String] comando insert SQL formatado usl (ledger) def usl_ins "insert #{BD}.usl(txid,refid,time,type,aclass,asset,amount,fee" \ ") VALUES#{transacoes.ledger.map { |k, v| usl_val1(k, v) }.join(',')}" end # @return [String] valores formatados usl (ledger parte1) def usl_val1(idx, hes) "('#{idx}'," \ "'#{hes['refid']}'," \ "PARSE_DATETIME('%s', '#{String(hes['time'].round)}')," \ "'#{hes['type']}'," \ "#{hes['aclass'].length.zero? ? 'null' : "'#{hes['aclass']}'"}," \ "'#{hes['asset']}'," \ "cast(#{hes['amount']} as numeric)," \ "cast(#{hes['fee']} as numeric))" end # cria job bigquery & verifica execucao # # @param cmd (see sql) # @return [Boolean] job ok? def job?(cmd) @job = api.query_job(cmd) @job.wait_until_done! puts(@job.error['message']) if @job.failed? @job.failed? end # cria Structured Query Language (SQL) job bigquery # # @param [String] cmd comando SQL a executar # @param [String] red resultado quando SQL tem erro # @return [Google::Cloud::Bigquery::Data] resultado do SQL def sql(cmd, red = []) @sqr = job?(cmd) ? red : job.data end # cria Data Manipulation Language (DML) job bigquery # # @param cmd (see sql) # @return [Integer] numero linhas afetadas def dml(cmd) job?(cmd) ? 0 : job.num_dml_affected_rows end end end