lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.2 vs lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.3
- old
+ new
@@ -20,29 +20,41 @@
end
def watch_tx_confirmed(request, call)
logger.info("watch_tx_confirmed: #{request}")
utxo_handler << request
- channel = Concurrent::Channel.new
+ channel = Concurrent::Channel.new(capacity: 100)
Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTxConfirmed])
ResponseEnum.new(request, channel, WatchTxConfirmedResponseBuilder).each
+ logger.info("watch_tx_confirmed: end")
+ rescue => e
+ logger.info("watch_tx_confirmed: #{e.message}")
+ logger.info("watch_tx_confirmed: #{e.backtrace}")
end
def watch_utxo(request, call)
logger.info("watch_utxo: #{request}")
utxo_handler << request
- channel = Concurrent::Channel.new
+ channel = Concurrent::Channel.new(capacity: 100)
Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent])
ResponseEnum.new(request, channel, WatchUtxoResponseBuilder).each
+ logger.info("watch_utxo: end")
+ rescue => e
+ logger.info("watch_utxo: #{e.message}")
+ logger.info("watch_utxo: #{e.backtrace}")
end
def watch_token(request, call)
logger.info("watch_token: #{request}")
utxo_handler << request
- channel = Concurrent::Channel.new
+ channel = Concurrent::Channel.new(capacity: 100)
Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered])
ResponseEnum.new(request, channel, WatchTokenResponseBuilder).each
+ logger.info("watch_token: end")
+ rescue => e
+ logger.info("watch_token: #{e.message}")
+ logger.info("watch_token: #{e.backtrace}")
end
end
class WatchTxConfirmedResponseBuilder
def self.build(id, event)
@@ -86,10 +98,11 @@
@channel = channel
@request = request
classes.each {|c| publisher << [:subscribe, c] }
end
def on_message(message)
+ log(::Logger::DEBUG, "Receiver#on_message:#{message}")
if request.id == message.request_id
log(::Logger::DEBUG, "Receiver#on_message:#{message}")
channel << message
end
end
@@ -108,9 +121,10 @@
def each
logger.info("ResponseEnum#each")
return enum_for(:each) unless block_given?
loop do
event = channel.take
+ channel.close
yield wrapper_classs.build(event.request_id, event)
end
end
end
end