lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.3 vs lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.4

- old
+ new

@@ -20,38 +20,38 @@ end def watch_tx_confirmed(request, call) logger.info("watch_tx_confirmed: #{request}") utxo_handler << request - channel = Concurrent::Channel.new(capacity: 100) - Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTxConfirmed]) - ResponseEnum.new(request, channel, WatchTxConfirmedResponseBuilder).each + response = [] + Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventTxConfirmed]) logger.info("watch_tx_confirmed: end") + ResponseEnum.new(request, response, WatchTxConfirmedResponseBuilder).each rescue => e logger.info("watch_tx_confirmed: #{e.message}") logger.info("watch_tx_confirmed: #{e.backtrace}") end def watch_utxo(request, call) logger.info("watch_utxo: #{request}") utxo_handler << request - channel = Concurrent::Channel.new(capacity: 100) - Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent]) - ResponseEnum.new(request, channel, WatchUtxoResponseBuilder).each + response = [] + Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent]) logger.info("watch_utxo: end") + ResponseEnum.new(request, response, WatchUtxoResponseBuilder).each rescue => e logger.info("watch_utxo: #{e.message}") logger.info("watch_utxo: #{e.backtrace}") end def watch_token(request, call) logger.info("watch_token: #{request}") utxo_handler << request - channel = Concurrent::Channel.new(capacity: 100) - Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered]) - ResponseEnum.new(request, channel, WatchTokenResponseBuilder).each + response = [] + Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered]) logger.info("watch_token: end") + ResponseEnum.new(request, response, WatchTokenResponseBuilder).each rescue => e logger.info("watch_token: #{e.message}") logger.info("watch_token: #{e.backtrace}") end end @@ -90,42 +90,43 @@ end class Receiver < Concurrent::Actor::Context include Concurrent::Concern::Logging - attr_reader :channel, :request + attr_reader :request, :response - def initialize(channel, request, publisher, classes) - @channel = channel + def initialize(request, response, publisher, classes) @request = request + @response = response classes.each {|c| publisher << [:subscribe, c] } end def on_message(message) - log(::Logger::DEBUG, "Receiver#on_message:#{message}") if request.id == message.request_id log(::Logger::DEBUG, "Receiver#on_message:#{message}") - channel << message + response << message end end end class ResponseEnum - attr_reader :req, :channel, :wrapper_classs, :logger + attr_reader :req, :response, :wrapper_classs, :logger - def initialize(req, channel, wrapper_classs) + def initialize(req, response, wrapper_classs) @req = req - @channel = channel + @response = response @wrapper_classs = wrapper_classs @logger = Bitcoin::Logger.create(:debug) end def each - logger.info("ResponseEnum#each") return enum_for(:each) unless block_given? loop do - event = channel.take - channel.close - yield wrapper_classs.build(event.request_id, event) + event = response.first + if event + yield wrapper_classs.build(event.request_id, event) + else + sleep(1) + end end end end end end