lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.1 vs lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.2
- old
+ new
@@ -21,75 +21,79 @@
def watch_tx_confirmed(request, call)
logger.info("watch_tx_confirmed: #{request}")
utxo_handler << request
channel = Concurrent::Channel.new
- Receiver.spawn(:receiver, channel, publisher, [Bitcoin::Grpc::EventTxConfirmed])
+ Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTxConfirmed])
ResponseEnum.new(request, channel, WatchTxConfirmedResponseBuilder).each
end
def watch_utxo(request, call)
logger.info("watch_utxo: #{request}")
utxo_handler << request
channel = Concurrent::Channel.new
- Receiver.spawn(:receiver, channel, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent])
+ Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent])
ResponseEnum.new(request, channel, WatchUtxoResponseBuilder).each
end
def watch_token(request, call)
logger.info("watch_token: #{request}")
utxo_handler << request
channel = Concurrent::Channel.new
- Receiver.spawn(:receiver, channel, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered])
+ Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered])
ResponseEnum.new(request, channel, WatchTokenResponseBuilder).each
end
end
class WatchTxConfirmedResponseBuilder
- def self.build(event)
+ def self.build(id, event)
case event
when Bitcoin::Grpc::EventTxConfirmed
Bitcoin::Grpc::WatchTxConfirmedResponse.new(confirmed: event)
end
end
end
class WatchUtxoResponseBuilder
- def self.build(event)
+ def self.build(id, event)
case event
when Bitcoin::Grpc::EventUtxoRegistered
- Bitcoin::Grpc::WatchUtxoResponse.new(registered: event)
+ Bitcoin::Grpc::WatchUtxoResponse.new(id: id, registered: event)
when Bitcoin::Grpc::EventUtxoSpent
- Bitcoin::Grpc::WatchUtxoResponse.new(spent: event)
+ Bitcoin::Grpc::WatchUtxoResponse.new(id: id, spent: event)
end
end
end
class WatchTokenResponseBuilder
- def self.build(event)
+ def self.build(id, event)
case event
when Bitcoin::Grpc::EventTokenIssued
- Bitcoin::Grpc::WatchTokenResponse.new(issued: event)
+ Bitcoin::Grpc::WatchTokenResponse.new(id: id, issued: event)
when Bitcoin::Grpc::EventTokenTransfered
- Bitcoin::Grpc::WatchTokenResponse.new(transfered: event)
+ Bitcoin::Grpc::WatchTokenResponse.new(id: id, transfered: event)
when Bitcoin::Grpc::EventTokenBurned
- Bitcoin::Grpc::WatchTokenResponse.new(burned: event)
+ Bitcoin::Grpc::WatchTokenResponse.new(id: id, burned: event)
end
end
end
class Receiver < Concurrent::Actor::Context
include Concurrent::Concern::Logging
- attr_reader :channel
- def initialize(channel, publisher, classes)
+ attr_reader :channel, :request
+
+ def initialize(channel, request, publisher, classes)
@channel = channel
+ @request = request
classes.each {|c| publisher << [:subscribe, c] }
end
def on_message(message)
- log(::Logger::DEBUG, "Receiver#on_message:#{message}")
- channel << message
+ if request.id == message.request_id
+ log(::Logger::DEBUG, "Receiver#on_message:#{message}")
+ channel << message
+ end
end
end
class ResponseEnum
attr_reader :req, :channel, :wrapper_classs, :logger
@@ -103,10 +107,11 @@
def each
logger.info("ResponseEnum#each")
return enum_for(:each) unless block_given?
loop do
- yield wrapper_classs.build(channel.take)
+ event = channel.take
+ yield wrapper_classs.build(event.request_id, event)
end
end
end
end
end