lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.1 vs lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.2

- old
+ new

@@ -21,75 +21,79 @@ def watch_tx_confirmed(request, call) logger.info("watch_tx_confirmed: #{request}") utxo_handler << request channel = Concurrent::Channel.new - Receiver.spawn(:receiver, channel, publisher, [Bitcoin::Grpc::EventTxConfirmed]) + Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTxConfirmed]) ResponseEnum.new(request, channel, WatchTxConfirmedResponseBuilder).each end def watch_utxo(request, call) logger.info("watch_utxo: #{request}") utxo_handler << request channel = Concurrent::Channel.new - Receiver.spawn(:receiver, channel, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent]) + Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent]) ResponseEnum.new(request, channel, WatchUtxoResponseBuilder).each end def watch_token(request, call) logger.info("watch_token: #{request}") utxo_handler << request channel = Concurrent::Channel.new - Receiver.spawn(:receiver, channel, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered]) + Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered]) ResponseEnum.new(request, channel, WatchTokenResponseBuilder).each end end class WatchTxConfirmedResponseBuilder - def self.build(event) + def self.build(id, event) case event when Bitcoin::Grpc::EventTxConfirmed Bitcoin::Grpc::WatchTxConfirmedResponse.new(confirmed: event) end end end class WatchUtxoResponseBuilder - def self.build(event) + def self.build(id, event) case event when Bitcoin::Grpc::EventUtxoRegistered - Bitcoin::Grpc::WatchUtxoResponse.new(registered: event) + Bitcoin::Grpc::WatchUtxoResponse.new(id: id, registered: event) when Bitcoin::Grpc::EventUtxoSpent - Bitcoin::Grpc::WatchUtxoResponse.new(spent: event) + Bitcoin::Grpc::WatchUtxoResponse.new(id: id, spent: event) end end end class WatchTokenResponseBuilder - def self.build(event) + def self.build(id, event) case event when Bitcoin::Grpc::EventTokenIssued - Bitcoin::Grpc::WatchTokenResponse.new(issued: event) + Bitcoin::Grpc::WatchTokenResponse.new(id: id, issued: event) when Bitcoin::Grpc::EventTokenTransfered - Bitcoin::Grpc::WatchTokenResponse.new(transfered: event) + Bitcoin::Grpc::WatchTokenResponse.new(id: id, transfered: event) when Bitcoin::Grpc::EventTokenBurned - Bitcoin::Grpc::WatchTokenResponse.new(burned: event) + Bitcoin::Grpc::WatchTokenResponse.new(id: id, burned: event) end end end class Receiver < Concurrent::Actor::Context include Concurrent::Concern::Logging - attr_reader :channel - def initialize(channel, publisher, classes) + attr_reader :channel, :request + + def initialize(channel, request, publisher, classes) @channel = channel + @request = request classes.each {|c| publisher << [:subscribe, c] } end def on_message(message) - log(::Logger::DEBUG, "Receiver#on_message:#{message}") - channel << message + if request.id == message.request_id + log(::Logger::DEBUG, "Receiver#on_message:#{message}") + channel << message + end end end class ResponseEnum attr_reader :req, :channel, :wrapper_classs, :logger @@ -103,10 +107,11 @@ def each logger.info("ResponseEnum#each") return enum_for(:each) unless block_given? loop do - yield wrapper_classs.build(channel.take) + event = channel.take + yield wrapper_classs.build(event.request_id, event) end end end end end