lib/zold/node/spread_entrance.rb in zold-0.16.1 vs lib/zold/node/spread_entrance.rb in zold-0.16.2

- old
+ new

@@ -19,11 +19,10 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. require 'concurrent' -require 'concurrent/set' require 'tempfile' require_relative 'emission' require_relative '../log' require_relative '../remotes' require_relative '../copies' @@ -45,10 +44,11 @@ @wallets = wallets @remotes = remotes @address = address @log = log @ignore_score_weakeness = ignore_score_weakeness + @mutex = Mutex.new end def to_json @entrance.to_json.merge( 'modified': @modified.size, @@ -56,11 +56,11 @@ ) end def start @entrance.start do - @seen = Concurrent::Set.new + @seen = Set.new @modified = Queue.new @push = Thread.start do Thread.current.abort_on_exception = true Thread.current.name = 'push' VerboseThread.new(@log).run(true) do @@ -72,11 +72,11 @@ Push.new(wallets: @wallets, remotes: @remotes, log: @log).run( ['push', "--ignore-node=#{@address}", id.to_s] + (@ignore_score_weakeness ? ['--ignore-score-weakness'] : []) ) end - @seen.delete(id) + @mutex.synchronize { @seen.delete(id) } end end end begin yield(self) @@ -93,10 +93,10 @@ def push(id, body) mods = @entrance.push(id, body) return mods if @remotes.all.empty? (mods + [id]).each do |m| next if @seen.include?(m) - @seen << m + @mutex.synchronize { @seen << m } @modified.push(m) @log.debug("Spread-push scheduled for #{m}, queue size is #{@modified.size}") end mods end