lib/universa/client.rb in universa-0.1.6 vs lib/universa/client.rb in universa-0.1.7
- old
+ new
@@ -4,83 +4,14 @@
module Universa
using Universa
- def retry_with_timeout(max_timeout = 15, max_times = 3, &block)
- attempt = 0
- Timeout::timeout(max_timeout, &block)
- rescue
- attempt += 1
- puts "timeout: retry (#$!): #{attempt}"
- retry if attempt < max_times
- raise
- end
-
-
- module Parallel
-
- class ParallelEnumerable < SimpleDelegator
- include Concurrent
-
- @@pool = CachedThreadPool.new
-
- def each_with_index &block
- latch = CountDownLatch.new(size)
- __getobj__.each_with_index {|x, index|
- @@pool << -> {
- begin
- block.call(x, index)
- rescue
- $!.print_stack_trace
- ensure
- latch.count_down
- end
- }
- }
- latch.wait
- end
-
- def each &block
- each_with_index {|x, i| block.call(x)}
- end
-
-
- def map &block
- result = size.times.map {nil}
- each_with_index {|value, i|
- result[i] = block.call(value)
- }
- result.par
- end
-
- alias_method :collect, :map
- end
-
- refine Enumerable do
- def par
- is_a?(ParallelEnumerable) ? self : ParallelEnumerable.new(self)
- end
-
- def group_by &block
- result = {}
- each {|value|
- new_key = block.call(value)
- (result[new_key] ||= []) << value
- }
- result
- end
- end
-
- end
-
-
- using Parallel
-
# Universa network client reads current network configuration and provides access to each node independently
# and also implement newtor-wide procedures.
class Client
+ using Universa::Parallel
# Create client
# @param [PrivateKey] private_key to connect with. Generates new one if omitted.
def initialize private_key = nil
@connection_key = private_key
@@ -100,10 +31,37 @@
# @return [Connection] random connection
def random_connection
@nodes.sample
end
+ def register_single contract
+ random_connection.register_single contract
+ end
+
+ # Perform fats consensus state check. E.g. it scans up to 2/3 of the network until
+ # the positive or negative consensus will be found. So far you can only rely on
+ # result.approved? as it returns some last node result which, though, match the
+ # consensus. Aggregation of parameters is under way.
+ #
+ # @param [Contract | HashId] obj to check
+ # @return [ContractState] of some final node check It does not aggregates (yet)
+ def get_state obj
+ result = Concurrent::IVar.new
+ negative_votes = Concurrent::AtomicFixnum.new(@nodes.size * 20 / 100)
+ positive_votes = Concurrent::AtomicFixnum.new(@nodes.size * 30 / 100)
+ random_connections(@nodes.size * 2 / 3).par.each {|conn|
+ if result.incomplete?
+ if (state = conn.get_state(obj)).approved?
+ result.try_set(state) if positive_votes.decrement < 0
+ else
+ result.try_set(state) if negative_votes.decrement < 0
+ end
+ end
+ }
+ result.value
+ end
+
# @return [Array(Connection)] array of count randomly selected connections
def random_connections count = 1
@nodes.sample(count)
end
@@ -142,36 +100,43 @@
# The node information
class NodeInfo
attr :number, :packed_key, :url
+ # constructs from binary packed data
def initialize(data)
@data, @number, @url, @packed_key = data, data.number, data.url, data.packed_key
@rate = Concurrent::AtomicFixnum.new
end
+ # currently collected approval rate
def rate
@rate.value
end
+ # increase approval rate
def increment_rate
@rate.increment
end
+ # check information euqlity
def == other
# number == other.number && packed_key == other.packed_key && url == other.url
url == other&.url && packed_key == other&.packed_key && url == other&.url
end
+ # allows to use as hash key
def hash
@url.hash + @packed_key.hash
end
+ # to use as hash key
def eql?(other)
self == other
end
+ # ordered by approval rate
def < other
rate < other.rate
end
end
@@ -195,19 +160,57 @@
# ciphering.
def ping
execute(:sping)
end
+ # Register a single contract (on private network or if you have white key allowing free operations)
+ # on a single node.
+ #
+ # @param [Contract] contract, muts be sealed ({Contract#seal})
+ # @return [ContractState] of the result. Could contain errors.
+ def register_single contract
+ result = ContractState.new(execute "approve", packedItem: contract.packed)
+ while result.is_pending
+ sleep(0.1)
+ result = get_state contract
+ end
+ result
+ end
+
+ # Get contract or hashId state from this single node
+ # @param [Contract | HashId] x what to check
+ # @return [ContractState]
+ def get_state x
+ id = case x
+ when HashId
+ x
+ when Contract
+ x.hash_id
+ else
+ raise ArgumentError, "bad argument, want Contract or HashId"
+ end
+ ContractState.new(execute "getState", itemId: id)
+ end
+
+
# Execute Universa Node client protocol command with optional keyword arguments that will be passed
# to the node.
#
# @param [String|Symbol] name of the command
# @return [SmartHash] with the command result
def execute name, **kwargs
connection.command name.to_s, *kwargs.to_a.flatten
end
+ def to_s
+ "Conn<#{@node_info.url}>"
+ end
+
+ def inspect
+ to_s
+ end
+
protected
def connection
@connection ||= retry_with_timeout(15, 3) {
Service.umi.instantiate "com.icodici.universa.node2.network.Client",
@@ -216,8 +219,53 @@
nil,
false
}
end
+ end
+
+ class ContractState
+ def initialize(universa_contract_state)
+ @source = universa_contract_state
+ end
+
+ def errors
+ @source.errors&.map &:to_s
+ rescue
+ "failed to extract errors: #$!"
+ end
+
+ def state
+ @source.itemResult.state
+ end
+
+ def is_pending
+ state.start_with?('PENDING')
+ end
+
+ def is_approved
+ case state
+ when 'APPROVED', 'LOCKED'
+ true
+ else
+ false
+ end
+ end
+
+ def approved?
+ is_approved
+ end
+
+ def pending?
+ is_pending
+ end
+
+ def to_s
+ "ContractState:#{state}"
+ end
+
+ def inspect
+ to_s
+ end
end
end