lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.4 vs lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.5

- old
+ new

@@ -1,26 +1,40 @@ module Bitcoin module Grpc class Server < Bitcoin::Grpc::Blockchain::Service - def self.run(spv) + def self.run(spv, publisher, utxo_handler, asset_handler) addr = "0.0.0.0:8080" s = GRPC::RpcServer.new s.add_http2_port(addr, :this_port_is_insecure) - s.handle(new(spv)) + s.handle(new(spv, publisher, utxo_handler, asset_handler)) s.run_till_terminated end attr_reader :spv, :utxo_handler, :asset_handler, :publisher, :logger - def initialize(spv) + def initialize(spv, publisher, utxo_handler, asset_handler) @spv = spv - @publisher = Bitcoin::Wallet::Publisher.spawn(:publisher) - @utxo_handler = Bitcoin::Wallet::UtxoHandler.spawn(:utxo_handler, spv, publisher) - @asset_handler = Bitcoin::Wallet::AssetHandler.spawn(:asset_handler, spv, publisher) + @publisher = publisher + @utxo_handler = utxo_handler + @asset_handler = asset_handler @logger = Bitcoin::Logger.create(:debug) end + def get_blockchain_info(request, call) + best_block = spv.chain.latest_block + GetBlockchainInfoResponse.new( + chain: Bitcoin.chain_params.network.to_s, + headers: best_block.height, + bestblockhash: best_block.header.block_id, + chainwork: best_block.header.work, + mediantime: spv.chain.mtp(best_block.block_hash) + ) + rescue => e + logger.info("get_blockchain_info: #{e.message}") + logger.info("get_blockchain_info: #{e.backtrace}") + end + def watch_tx_confirmed(request, call) logger.info("watch_tx_confirmed: #{request}") utxo_handler << request response = [] Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventTxConfirmed]) @@ -41,10 +55,22 @@ rescue => e logger.info("watch_utxo: #{e.message}") logger.info("watch_utxo: #{e.backtrace}") end + def watch_utxo_spent(request, call) + logger.info("watch_utxo_spent: #{request}") + utxo_handler << request + response = [] + Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventUtxoSpent]) + logger.info("watch_utxo_spent: end") + ResponseEnum.new(request, response, WatchUtxoSpentResponseBuilder).each + rescue => e + logger.info("watch_utxo_spent: #{e.message}") + logger.info("watch_utxo_spent: #{e.backtrace}") + end + def watch_token(request, call) logger.info("watch_token: #{request}") utxo_handler << request response = [] Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered]) @@ -52,12 +78,79 @@ ResponseEnum.new(request, response, WatchTokenResponseBuilder).each rescue => e logger.info("watch_token: #{e.message}") logger.info("watch_token: #{e.backtrace}") end + + def events(requests) + logger.info("events: #{requests}") + events = [] + + receiver = EventsReceiver.spawn(:receiver, events, publisher) + requests.each do |request| + receiver << request + end + + logger.info("events: end") + EventsResponseEnum.new(events).each + rescue => e + logger.error("events: #{e.message}") + logger.error("events: #{e.backtrace}") + end end + class EventsReceiver < Concurrent::Actor::Context + attr_reader :events, :logger, :publisher + + def initialize(events, publisher) + @events = events + @publisher = publisher + @logger = Bitcoin::Logger.create(:debug) + end + + def on_message(message) + case message + when Bitcoin::Grpc::EventsRequest + clazz = Object.const_get("Bitcoin").const_get("Grpc").const_get(message.event_type) + case message.operation + when :SUBSCRIBE + publisher << [:subscribe, clazz] + when :UNSUBSCRIBE + publisher << [:unsubscribe, clazz] + else + logger.error("unsupported operation") + end + else + events << message + end + end + end + + class EventsResponseEnum + attr_reader :events, :logger + + def initialize(events) + @events = events + @logger = Bitcoin::Logger.create(:debug) + end + + def each + return enum_for(:each) unless block_given? + loop do + event = events.shift + if event + response = Bitcoin::Grpc::EventsResponse.new + field = event.class.name.split('::').last.snake + response[field] = event + yield response + else + sleep(1) + end + end + end + end + class WatchTxConfirmedResponseBuilder def self.build(id, event) case event when Bitcoin::Grpc::EventTxConfirmed Bitcoin::Grpc::WatchTxConfirmedResponse.new(confirmed: event) @@ -70,9 +163,18 @@ case event when Bitcoin::Grpc::EventUtxoRegistered Bitcoin::Grpc::WatchUtxoResponse.new(id: id, registered: event) when Bitcoin::Grpc::EventUtxoSpent Bitcoin::Grpc::WatchUtxoResponse.new(id: id, spent: event) + end + end + end + + class WatchUtxoSpentResponseBuilder + def self.build(id, event) + case event + when Bitcoin::Grpc::EventUtxoSpent + Bitcoin::Grpc::WatchUtxoSpentResponse.new(id: id, spent: event) end end end class WatchTokenResponseBuilder