lib/universa/client.rb in universa-0.1.6 vs lib/universa/client.rb in universa-0.1.7

- old
+ new

@@ -4,83 +4,14 @@ module Universa using Universa - def retry_with_timeout(max_timeout = 15, max_times = 3, &block) - attempt = 0 - Timeout::timeout(max_timeout, &block) - rescue - attempt += 1 - puts "timeout: retry (#$!): #{attempt}" - retry if attempt < max_times - raise - end - - - module Parallel - - class ParallelEnumerable < SimpleDelegator - include Concurrent - - @@pool = CachedThreadPool.new - - def each_with_index &block - latch = CountDownLatch.new(size) - __getobj__.each_with_index {|x, index| - @@pool << -> { - begin - block.call(x, index) - rescue - $!.print_stack_trace - ensure - latch.count_down - end - } - } - latch.wait - end - - def each &block - each_with_index {|x, i| block.call(x)} - end - - - def map &block - result = size.times.map {nil} - each_with_index {|value, i| - result[i] = block.call(value) - } - result.par - end - - alias_method :collect, :map - end - - refine Enumerable do - def par - is_a?(ParallelEnumerable) ? self : ParallelEnumerable.new(self) - end - - def group_by &block - result = {} - each {|value| - new_key = block.call(value) - (result[new_key] ||= []) << value - } - result - end - end - - end - - - using Parallel - # Universa network client reads current network configuration and provides access to each node independently # and also implement newtor-wide procedures. class Client + using Universa::Parallel # Create client # @param [PrivateKey] private_key to connect with. Generates new one if omitted. def initialize private_key = nil @connection_key = private_key @@ -100,10 +31,37 @@ # @return [Connection] random connection def random_connection @nodes.sample end + def register_single contract + random_connection.register_single contract + end + + # Perform fats consensus state check. E.g. it scans up to 2/3 of the network until + # the positive or negative consensus will be found. So far you can only rely on + # result.approved? as it returns some last node result which, though, match the + # consensus. Aggregation of parameters is under way. + # + # @param [Contract | HashId] obj to check + # @return [ContractState] of some final node check It does not aggregates (yet) + def get_state obj + result = Concurrent::IVar.new + negative_votes = Concurrent::AtomicFixnum.new(@nodes.size * 20 / 100) + positive_votes = Concurrent::AtomicFixnum.new(@nodes.size * 30 / 100) + random_connections(@nodes.size * 2 / 3).par.each {|conn| + if result.incomplete? + if (state = conn.get_state(obj)).approved? + result.try_set(state) if positive_votes.decrement < 0 + else + result.try_set(state) if negative_votes.decrement < 0 + end + end + } + result.value + end + # @return [Array(Connection)] array of count randomly selected connections def random_connections count = 1 @nodes.sample(count) end @@ -142,36 +100,43 @@ # The node information class NodeInfo attr :number, :packed_key, :url + # constructs from binary packed data def initialize(data) @data, @number, @url, @packed_key = data, data.number, data.url, data.packed_key @rate = Concurrent::AtomicFixnum.new end + # currently collected approval rate def rate @rate.value end + # increase approval rate def increment_rate @rate.increment end + # check information euqlity def == other # number == other.number && packed_key == other.packed_key && url == other.url url == other&.url && packed_key == other&.packed_key && url == other&.url end + # allows to use as hash key def hash @url.hash + @packed_key.hash end + # to use as hash key def eql?(other) self == other end + # ordered by approval rate def < other rate < other.rate end end @@ -195,19 +160,57 @@ # ciphering. def ping execute(:sping) end + # Register a single contract (on private network or if you have white key allowing free operations) + # on a single node. + # + # @param [Contract] contract, muts be sealed ({Contract#seal}) + # @return [ContractState] of the result. Could contain errors. + def register_single contract + result = ContractState.new(execute "approve", packedItem: contract.packed) + while result.is_pending + sleep(0.1) + result = get_state contract + end + result + end + + # Get contract or hashId state from this single node + # @param [Contract | HashId] x what to check + # @return [ContractState] + def get_state x + id = case x + when HashId + x + when Contract + x.hash_id + else + raise ArgumentError, "bad argument, want Contract or HashId" + end + ContractState.new(execute "getState", itemId: id) + end + + # Execute Universa Node client protocol command with optional keyword arguments that will be passed # to the node. # # @param [String|Symbol] name of the command # @return [SmartHash] with the command result def execute name, **kwargs connection.command name.to_s, *kwargs.to_a.flatten end + def to_s + "Conn<#{@node_info.url}>" + end + + def inspect + to_s + end + protected def connection @connection ||= retry_with_timeout(15, 3) { Service.umi.instantiate "com.icodici.universa.node2.network.Client", @@ -216,8 +219,53 @@ nil, false } end + end + + class ContractState + def initialize(universa_contract_state) + @source = universa_contract_state + end + + def errors + @source.errors&.map &:to_s + rescue + "failed to extract errors: #$!" + end + + def state + @source.itemResult.state + end + + def is_pending + state.start_with?('PENDING') + end + + def is_approved + case state + when 'APPROVED', 'LOCKED' + true + else + false + end + end + + def approved? + is_approved + end + + def pending? + is_pending + end + + def to_s + "ContractState:#{state}" + end + + def inspect + to_s + end end end