lib/zold/node/spread_entrance.rb in zold-0.15.0 vs lib/zold/node/spread_entrance.rb in zold-0.16.0
- old
+ new
@@ -19,10 +19,11 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
require 'concurrent'
+require 'concurrent/set'
require 'tempfile'
require_relative 'emission'
require_relative '../log'
require_relative '../remotes'
require_relative '../copies'
@@ -38,21 +39,14 @@
# License:: MIT
module Zold
# The entrance
class SpreadEntrance
def initialize(entrance, wallets, remotes, address, log: Log::Quiet.new, ignore_score_weakeness: false)
- raise 'Entrance can\'t be nil' if entrance.nil?
@entrance = entrance
- raise 'Wallets can\'t be nil' if wallets.nil?
- raise 'Wallets must implement the contract of Wallets: method #find is required' unless wallets.respond_to?(:find)
@wallets = wallets
- raise 'Remotes can\'t be nil' if remotes.nil?
- raise 'Remotes must be of type Remotes' unless remotes.is_a?(Remotes)
@remotes = remotes
- raise 'Address can\'t be nil' if address.nil?
@address = address
- raise 'Log can\'t be nil' if log.nil?
@log = log
@ignore_score_weakeness = ignore_score_weakeness
end
def to_json
@@ -62,11 +56,11 @@
)
end
def start
@entrance.start do
- @seen = Set.new
+ @seen = Concurrent::Set.new
@modified = Queue.new
@push = Thread.start do
Thread.current.abort_on_exception = true
Thread.current.name = 'push'
VerboseThread.new(@log).run(true) do
@@ -93,16 +87,18 @@
@log.info('Spread entrance finished, thread killed')
end
end
end
+ # This method is thread-safe
def push(id, body)
mods = @entrance.push(id, body)
+ return mods if @remotes.all.empty?
(mods + [id]).each do |m|
next if @seen.include?(m)
@seen << m
@modified.push(m)
- @log.debug("Push scheduled for #{m}, queue size is #{@modified.size}")
+ @log.debug("Spread-push scheduled for #{m}, queue size is #{@modified.size}")
end
mods
end
end
end