module TxCatcher class Catcher attr_accessor :break_all_loops attr_reader :name, :queue, :zeromq_threads, :queue_threads, :sockets def initialize(name:, socket_prefix: "ipc:///tmp/", init_threads: true) @socket_prefix = socket_prefix @name = name @queue = {} @sockets = {} @zeromq_threads = [] @queue_threads = [] ['rawtx', 'hashblock'].each do |channel| @queue_threads << Thread.new { listen_to_action_queues(channel) } @zeromq_threads << Thread.new { listen_to_zeromq_channels(channel) } end end def close_all_connections @break_all_loops = true (@zeromq_threads + @queue_threads).each { |t| t.kill } @sockets.each { |k,v| v[:object].close } end # Responsible for actions after the message from ZeroMQ is parsed, # typically it's writing data to DB through the models. We start it # before we start listening to any messages from ZeroMQ. def listen_to_action_queues(channel) @queue[channel] = Queue.new until @break_all_loops LOGGER.report "in #{channel} queue: #{@queue[channel].size}" if Config["logger"]["log_queue_info"] if @queue[channel].empty? sleep 1 else begin @queue[channel].pop.call rescue Sequel::ValidationFailed => e LOGGER.report e, :warn, timestamp: true rescue Exception => e LOGGER.report e, :error, timestamp: true end end end end # Now we can start receiving messages from ZeroMQ. # On every received message we call a handler method, which parses it # appropriately (each ZeroMQ channel has its own handler method) and then # adds additional tasks, such as writing to the DB, in the queue. # They queue itself is handled in the thread created above. def listen_to_zeromq_channels(channel) address = "#{@socket_prefix}#{@name}.#{channel}" LOGGER.report "Start listening on #{@name} #{channel}... (#{address})" context = ZMQ::Context.new socket = context.socket(ZMQ::SUB) socket.setsockopt(ZMQ::SUBSCRIBE, channel) socket.connect(address) @sockets[channel] = { object: socket } until @break_all_loops do message = [] socket.recv_strings(message) if message[1] message_hex = hexlify(message[1]).downcase @sockets[channel][:last_message] = message_hex send("handle_#{channel}", "#{message_hex}") end end end private def hexlify(s) a = [] s.each_byte do |b| a << sprintf('%02X', b) end a.join end def handle_rawtx(txhex) LOGGER.report "received tx hex: #{txhex[0..50]}..." @queue["rawtx"] << ( Proc.new { tx = TxCatcher::Transaction.new(hex: txhex) begin LOGGER.report "tx #{tx.txid} caught (id: #{tx.id}), deposits (outputs):" tx.save rescue Sequel::ValidationFailed => e if tx.errors[:txid].include?("is already taken") LOGGER.report " it's already in DB, no need to save it!" else raise e end end tx.deposits.each do |d| LOGGER.report " id: #{d.id}, addr: #{d.address.address}, amount: #{CryptoUnit.new(Config["currency"], d.amount, from_unit: :primary).to_standart}" end }) end def handle_hashblock(block_hex) block_hash = TxCatcher.rpc_node.getblock(block_hex) transactions = block_hash["tx"] height = TxCatcher.current_block_height = block_hash["height"].to_i LOGGER.report "*** Block #{height} mined, transactions received:\n #{transactions.join(" \n")}" @queue["hashblock"] << ( Proc.new { existing_transactions = Transaction.where(txid: transactions).map(&:txid) LOGGER.report "*** Block #{height} mined, transactions received:\n #{transactions.join(" \n")}" Transaction.where(txid: transactions).update(block_height: height) }) # Update RBF transactions and deposits if a transaction with lower fee (no associated deposit) got # accidentally confirmed. TxCatcher::Transaction.where(block_height: height).exclude(rbf_next_transaction_id: nil).each do |t| t.force_deposit_association_on_rbf! end end end # class Catcher end