lib/zold/node/spread_entrance.rb in zold-0.15.0 vs lib/zold/node/spread_entrance.rb in zold-0.16.0

- old
+ new

@@ -19,10 +19,11 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. require 'concurrent' +require 'concurrent/set' require 'tempfile' require_relative 'emission' require_relative '../log' require_relative '../remotes' require_relative '../copies' @@ -38,21 +39,14 @@ # License:: MIT module Zold # The entrance class SpreadEntrance def initialize(entrance, wallets, remotes, address, log: Log::Quiet.new, ignore_score_weakeness: false) - raise 'Entrance can\'t be nil' if entrance.nil? @entrance = entrance - raise 'Wallets can\'t be nil' if wallets.nil? - raise 'Wallets must implement the contract of Wallets: method #find is required' unless wallets.respond_to?(:find) @wallets = wallets - raise 'Remotes can\'t be nil' if remotes.nil? - raise 'Remotes must be of type Remotes' unless remotes.is_a?(Remotes) @remotes = remotes - raise 'Address can\'t be nil' if address.nil? @address = address - raise 'Log can\'t be nil' if log.nil? @log = log @ignore_score_weakeness = ignore_score_weakeness end def to_json @@ -62,11 +56,11 @@ ) end def start @entrance.start do - @seen = Set.new + @seen = Concurrent::Set.new @modified = Queue.new @push = Thread.start do Thread.current.abort_on_exception = true Thread.current.name = 'push' VerboseThread.new(@log).run(true) do @@ -93,16 +87,18 @@ @log.info('Spread entrance finished, thread killed') end end end + # This method is thread-safe def push(id, body) mods = @entrance.push(id, body) + return mods if @remotes.all.empty? (mods + [id]).each do |m| next if @seen.include?(m) @seen << m @modified.push(m) - @log.debug("Push scheduled for #{m}, queue size is #{@modified.size}") + @log.debug("Spread-push scheduled for #{m}, queue size is #{@modified.size}") end mods end end end