lib/zold/node/spread_entrance.rb in zold-0.16.1 vs lib/zold/node/spread_entrance.rb in zold-0.16.2
- old
+ new
@@ -19,11 +19,10 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
require 'concurrent'
-require 'concurrent/set'
require 'tempfile'
require_relative 'emission'
require_relative '../log'
require_relative '../remotes'
require_relative '../copies'
@@ -45,10 +44,11 @@
@wallets = wallets
@remotes = remotes
@address = address
@log = log
@ignore_score_weakeness = ignore_score_weakeness
+ @mutex = Mutex.new
end
def to_json
@entrance.to_json.merge(
'modified': @modified.size,
@@ -56,11 +56,11 @@
)
end
def start
@entrance.start do
- @seen = Concurrent::Set.new
+ @seen = Set.new
@modified = Queue.new
@push = Thread.start do
Thread.current.abort_on_exception = true
Thread.current.name = 'push'
VerboseThread.new(@log).run(true) do
@@ -72,11 +72,11 @@
Push.new(wallets: @wallets, remotes: @remotes, log: @log).run(
['push', "--ignore-node=#{@address}", id.to_s] +
(@ignore_score_weakeness ? ['--ignore-score-weakness'] : [])
)
end
- @seen.delete(id)
+ @mutex.synchronize { @seen.delete(id) }
end
end
end
begin
yield(self)
@@ -93,10 +93,10 @@
def push(id, body)
mods = @entrance.push(id, body)
return mods if @remotes.all.empty?
(mods + [id]).each do |m|
next if @seen.include?(m)
- @seen << m
+ @mutex.synchronize { @seen << m }
@modified.push(m)
@log.debug("Spread-push scheduled for #{m}, queue size is #{@modified.size}")
end
mods
end