lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.3 vs lib/bitcoin/grpc/server.rb in bitcoinrb-grpc-0.1.4
- old
+ new
@@ -20,38 +20,38 @@
end
def watch_tx_confirmed(request, call)
logger.info("watch_tx_confirmed: #{request}")
utxo_handler << request
- channel = Concurrent::Channel.new(capacity: 100)
- Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTxConfirmed])
- ResponseEnum.new(request, channel, WatchTxConfirmedResponseBuilder).each
+ response = []
+ Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventTxConfirmed])
logger.info("watch_tx_confirmed: end")
+ ResponseEnum.new(request, response, WatchTxConfirmedResponseBuilder).each
rescue => e
logger.info("watch_tx_confirmed: #{e.message}")
logger.info("watch_tx_confirmed: #{e.backtrace}")
end
def watch_utxo(request, call)
logger.info("watch_utxo: #{request}")
utxo_handler << request
- channel = Concurrent::Channel.new(capacity: 100)
- Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent])
- ResponseEnum.new(request, channel, WatchUtxoResponseBuilder).each
+ response = []
+ Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventUtxoRegistered, Bitcoin::Grpc::EventUtxoSpent])
logger.info("watch_utxo: end")
+ ResponseEnum.new(request, response, WatchUtxoResponseBuilder).each
rescue => e
logger.info("watch_utxo: #{e.message}")
logger.info("watch_utxo: #{e.backtrace}")
end
def watch_token(request, call)
logger.info("watch_token: #{request}")
utxo_handler << request
- channel = Concurrent::Channel.new(capacity: 100)
- Receiver.spawn(:receiver, channel, request, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered])
- ResponseEnum.new(request, channel, WatchTokenResponseBuilder).each
+ response = []
+ Receiver.spawn(:receiver, request, response, publisher, [Bitcoin::Grpc::EventTokenIssued, Bitcoin::Grpc::EventTokenTransfered])
logger.info("watch_token: end")
+ ResponseEnum.new(request, response, WatchTokenResponseBuilder).each
rescue => e
logger.info("watch_token: #{e.message}")
logger.info("watch_token: #{e.backtrace}")
end
end
@@ -90,42 +90,43 @@
end
class Receiver < Concurrent::Actor::Context
include Concurrent::Concern::Logging
- attr_reader :channel, :request
+ attr_reader :request, :response
- def initialize(channel, request, publisher, classes)
- @channel = channel
+ def initialize(request, response, publisher, classes)
@request = request
+ @response = response
classes.each {|c| publisher << [:subscribe, c] }
end
def on_message(message)
- log(::Logger::DEBUG, "Receiver#on_message:#{message}")
if request.id == message.request_id
log(::Logger::DEBUG, "Receiver#on_message:#{message}")
- channel << message
+ response << message
end
end
end
class ResponseEnum
- attr_reader :req, :channel, :wrapper_classs, :logger
+ attr_reader :req, :response, :wrapper_classs, :logger
- def initialize(req, channel, wrapper_classs)
+ def initialize(req, response, wrapper_classs)
@req = req
- @channel = channel
+ @response = response
@wrapper_classs = wrapper_classs
@logger = Bitcoin::Logger.create(:debug)
end
def each
- logger.info("ResponseEnum#each")
return enum_for(:each) unless block_given?
loop do
- event = channel.take
- channel.close
- yield wrapper_classs.build(event.request_id, event)
+ event = response.first
+ if event
+ yield wrapper_classs.build(event.request_id, event)
+ else
+ sleep(1)
+ end
end
end
end
end
end