module TxCatcher class Catcher attr_accessor :name, :sockets def initialize(name:, socket: "ipc:///tmp/") @queue = {} @sockets = {} {'rawtx' => "#{socket}#{name}.rawtx", 'hashblock' => "#{socket}#{name}.hashblock"}.each do |channel, address| puts "Start listening on #{name} #{channel}... (#{address})" listen_to_zeromq_message(channel: channel, address: address) end end private def hexlify(s) a = [] s.each_byte do |b| a << sprintf('%02X', b) end a.join end def listen_to_zeromq_message(channel:, address:) @queue[channel] = Queue.new # This thread is responsible for actions after the messages from ZeroMQ is parsed, # typically it's writing data to DB through the models. We start it # before we start listening to any messages from ZeroMQ. queue_thread = Thread.new do loop do puts "in #{channel} queue: #{@queue[channel].size}" if @queue[channel].empty? sleep 1 else begin @queue[channel].pop.call rescue Sequel::ValidationFailed => e $stdout.puts "[WARNING #{Time.now.to_s}] #{e.class} #{e.to_s}\n" rescue StandardError => e File.open(TxCatcher::Config.config_dir + "/error.log", "a") do |f| f.puts "[ERROR #{Time.now.to_s}] #{e.class} #{e.to_s}\n #{e.backtrace.join("\n ")}\n\n" end end end end end # Now we can start receiving messages from ZeroMQ. # On every received message we call a handler method, which parses it # appropriately (each ZeroMQ channel has its own handler method) and then # adds additional tasks, such as writing to the DB, in the queue. # They queue itself is handled in the thread created above. key = "#{channel}#{address}" handler_thread = Thread.new do context = ZMQ::Context.new socket = context.socket(ZMQ::SUB) socket.setsockopt(ZMQ::SUBSCRIBE, channel) socket.connect(address) @sockets[key] = { object: socket } loop do topic = [] message = [] socket.recv_multipart(topic, message) message if message[1] message_hex = hexlify(message[1].copy_out_string).downcase @sockets[key][:last_message] = message_hex send("handle_#{channel}", message_hex) end end end end # listen_to_zeromq_message def handle_rawtx(txhex) $stdout.print "received tx hex: #{txhex[0..50]}...\n" @queue["rawtx"] << ( Proc.new { tx = TxCatcher::Transaction.new(hex: txhex) tx.save $stdout.puts "tx #{tx.txid} saved (id: #{tx.id}), deposits (outputs):" tx.deposits.each do |d| $stdout.puts " id: #{d.id}, addr: #{d.address.address}, amount: #{Satoshi.new(d.amount, from_unit: :satoshi).to_btc}" end }) end def handle_hashblock(block_hex) block_hash = TxCatcher.rpc_node.getblock(block_hex) transactions = block_hash["tx"] height = TxCatcher.current_block_height = block_hash["height"].to_i $stdout.puts "Block #{height} mined, transactions received:\n #{transactions.join(" \n")}" @queue["hashblock"] << ( Proc.new { Transaction.where(txid: transactions).update(block_height: height) }) end end # class Catcher end