lib/grumlin/repository/instance_methods.rb in grumlin-0.23.0 vs lib/grumlin/repository/instance_methods.rb in grumlin-1.0.0.rc1
- old
+ new
@@ -1,158 +1,155 @@
# frozen_string_literal: true
-module Grumlin
- module Repository
- module InstanceMethods # rubocop:disable Metrics/ModuleLength
- include Grumlin::Expressions
+module Grumlin::Repository::InstanceMethods # rubocop:disable Metrics/ModuleLength
+ include Grumlin::Expressions
- extend Forwardable
+ extend Forwardable
- UPSERT_RETRY_PARAMS = {
- on: [Grumlin::AlreadyExistsError, Grumlin::ConcurrentModificationError],
- sleep_method: ->(n) { Async::Task.current.sleep(n) },
- tries: 5,
- sleep: ->(n) { (n**2) + 1 + rand }
- }.freeze
+ def_delegator "self.class", :shortcuts
+ def_delegator :self, :__, :g
- DEFAULT_ERROR_HANDLING_STRATEGY = ErrorHandlingStrategy.new(mode: :retry, **UPSERT_RETRY_PARAMS)
+ UPSERT_RETRY_PARAMS = {
+ on: [Grumlin::AlreadyExistsError, Grumlin::ConcurrentModificationError],
+ sleep_method: ->(n) { Async::Task.current.sleep(n) },
+ tries: 5,
+ sleep: ->(n) { (n**2) + 1 + rand }
+ }.freeze
- def_delegators :shortcuts, :g, :__
+ DEFAULT_ERROR_HANDLING_STRATEGY = Grumlin::Repository::ErrorHandlingStrategy.new(mode: :retry, **UPSERT_RETRY_PARAMS)
- def shortcuts
- self.class.shortcuts
- end
+ def __
+ shortcuts.traversal_start_class.new(pool: Grumlin.default_pool, middlewares: self.class.middlewares)
+ end
- def drop_vertex(id, start: g)
- start.V(id).drop.iterate
- end
+ def drop_vertex(id, start: g)
+ start.V(id).drop.iterate
+ end
- def drop_in_batches(traversal, batch_size: 10_000) # rubocop:disable Metrics/AbcSize
- total_count = traversal.count.next
+ def drop_in_batches(traversal, batch_size: 10_000) # rubocop:disable Metrics/AbcSize
+ total_count = traversal.count.next
- batches = (total_count / batch_size) + 1
+ batches = (total_count / batch_size) + 1
- Console.logger.info(self) do
- "drop_in_batches: total_count: #{total_count}, batch_size: #{batch_size}, batches: #{batches}"
- end
+ Console.logger.info(self) do
+ "drop_in_batches: total_count: #{total_count}, batch_size: #{batch_size}, batches: #{batches}"
+ end
- batches.times do |batch|
- Console.logger.info(self) { "drop_in_batches: deleting batch #{batch + 1}/#{batches}..." }
- traversal.limit(batch_size).drop.iterate
- Console.logger.info(self) { "drop_in_batches: batch #{batch + 1}/#{batches} deleted" }
- end
+ batches.times do |batch|
+ Console.logger.info(self) { "drop_in_batches: deleting batch #{batch + 1}/#{batches}..." }
+ traversal.limit(batch_size).drop.iterate
+ Console.logger.info(self) { "drop_in_batches: batch #{batch + 1}/#{batches} deleted" }
+ end
- return if traversal.count.next.zero?
+ return if traversal.count.next.zero?
- drop_in_batches(traversal, batch_size: batch_size)
+ drop_in_batches(traversal, batch_size: batch_size)
- Console.logger.info(self) { "drop_in_batches: finished." }
- end
+ Console.logger.info(self) { "drop_in_batches: finished." }
+ end
- def drop_edge(id = nil, from: nil, to: nil, label: nil, start: g) # rubocop:disable Metrics/AbcSize
- raise ArgumentError, "either id or from:, to: and label: must be passed" if [id, from, to, label].all?(&:nil?)
- return start.E(id).drop.iterate unless id.nil?
+ def drop_edge(id = nil, from: nil, to: nil, label: nil, start: g) # rubocop:disable Metrics/AbcSize
+ raise ArgumentError, "either id or from:, to: and label: must be passed" if [id, from, to, label].all?(&:nil?)
+ return start.E(id).drop.iterate unless id.nil?
- raise ArgumentError, "from:, to: and label: must be passed" if [from, to, label].any?(&:nil?)
+ raise ArgumentError, "from:, to: and label: must be passed" if [from, to, label].any?(&:nil?)
- start.V(from).outE(label).where(__.inV.hasId(to)).limit(1).drop.iterate
- end
+ start.V(from).outE(label).where(__.inV.hasId(to)).limit(1).drop.iterate
+ end
- def add_vertex(label, id = nil, start: g, **properties)
- id ||= properties[T.id]
- properties = except(properties, T.id)
+ def add_vertex(label, id = nil, start: g, **properties)
+ id ||= properties[T.id]
+ properties = except(properties, T.id)
- t = start.addV(label)
- t = t.props(T.id => id) unless id.nil?
- t.props(**properties).next
- end
+ t = start.addV(label)
+ t = t.props(T.id => id) unless id.nil?
+ t.props(**properties).next
+ end
- def add_edge(label, id = nil, from:, to:, start: g, **properties)
- id ||= properties[T.id]
- properties = except(properties, T.label)
- properties[T.id] = id
+ def add_edge(label, id = nil, from:, to:, start: g, **properties)
+ id ||= properties[T.id]
+ properties = except(properties, T.label)
+ properties[T.id] = id
- start.addE(label).from(__.V(from)).to(__.V(to)).props(**properties).next
- end
+ start.addE(label).from(__.V(from)).to(__.V(to)).props(**properties).next
+ end
- def upsert_vertex(label, id, create_properties: {}, update_properties: {}, on_failure: :retry, start: g, **params) # rubocop:disable Metrics/ParameterLists
- with_upsert_error_handling(on_failure, params) do
- create_properties, update_properties = cleanup_properties(create_properties, update_properties)
+ def upsert_vertex(label, id, create_properties: {}, update_properties: {}, on_failure: :retry, start: g, **params) # rubocop:disable Metrics/ParameterLists
+ with_upsert_error_handling(on_failure, params) do
+ create_properties, update_properties = cleanup_properties(create_properties, update_properties)
- start.upsertV(label, id, create_properties, update_properties).id.next
- end
- end
+ start.upsertV(label, id, create_properties, update_properties).id.next
+ end
+ end
- # vertices:
- # [["label", "id", {create: :properties}, {update: properties}]]
- # params can override Retryable config from UPSERT_RETRY_PARAMS
- def upsert_vertices(vertices, batch_size: 100, on_failure: :retry, start: g, **params)
- vertices.each_slice(batch_size) do |slice|
- with_upsert_error_handling(on_failure, params) do
- slice.reduce(start) do |t, (label, id, create_properties, update_properties)|
- create_properties, update_properties = cleanup_properties(create_properties, update_properties)
+ # vertices:
+ # [["label", "id", {create: :properties}, {update: properties}]]
+ # params can override Retryable config from UPSERT_RETRY_PARAMS
+ def upsert_vertices(vertices, batch_size: 100, on_failure: :retry, start: g, **params)
+ vertices.each_slice(batch_size) do |slice|
+ with_upsert_error_handling(on_failure, params) do
+ slice.reduce(start) do |t, (label, id, create_properties, update_properties)|
+ create_properties, update_properties = cleanup_properties(create_properties, update_properties)
- t.upsertV(label, id, create_properties, update_properties)
- end.id.iterate
- end
- end
+ t.upsertV(label, id, create_properties, update_properties)
+ end.id.iterate
end
+ end
+ end
- # Only from and to are used to find the existing edge, if one wants to assign an id to a created edge,
- # it must be passed as T.id in create_properties.
- def upsert_edge(label, from:, to:, create_properties: {}, update_properties: {}, # rubocop:disable Metrics/ParameterLists
- on_failure: :retry, start: g, **params)
- with_upsert_error_handling(on_failure, params) do
+ # Only from and to are used to find the existing edge, if one wants to assign an id to a created edge,
+ # it must be passed as T.id in create_properties.
+ def upsert_edge(label, from:, to:, create_properties: {}, update_properties: {}, # rubocop:disable Metrics/ParameterLists
+ on_failure: :retry, start: g, **params)
+ with_upsert_error_handling(on_failure, params) do
+ create_properties, update_properties = cleanup_properties(create_properties, update_properties, T.label)
+ start.upsertE(label, from, to, create_properties, update_properties).id.next
+ end
+ end
+
+ # edges:
+ # [["label", "from", "to", {create: :properties}, {update: properties}]]
+ # params can override Retryable config from UPSERT_RETRY_PARAMS
+ def upsert_edges(edges, batch_size: 100, on_failure: :retry, start: g, **params)
+ edges.each_slice(batch_size) do |slice|
+ with_upsert_error_handling(on_failure, params) do
+ slice.reduce(start) do |t, (label, from, to, create_properties, update_properties)|
create_properties, update_properties = cleanup_properties(create_properties, update_properties, T.label)
- start.upsertE(label, from, to, create_properties, update_properties).id.next
- end
- end
- # edges:
- # [["label", "from", "to", {create: :properties}, {update: properties}]]
- # params can override Retryable config from UPSERT_RETRY_PARAMS
- def upsert_edges(edges, batch_size: 100, on_failure: :retry, start: g, **params)
- edges.each_slice(batch_size) do |slice|
- with_upsert_error_handling(on_failure, params) do
- slice.reduce(start) do |t, (label, from, to, create_properties, update_properties)|
- create_properties, update_properties = cleanup_properties(create_properties, update_properties, T.label)
-
- t.upsertE(label, from, to, create_properties, update_properties)
- end.id.iterate
- end
- end
+ t.upsertE(label, from, to, create_properties, update_properties)
+ end.id.iterate
end
+ end
+ end
- private
+ private
- def with_upsert_error_handling(on_failure, params, &block)
- if params.any?
- ErrorHandlingStrategy.new(mode: on_failure, **UPSERT_RETRY_PARAMS.merge(params))
- else
- DEFAULT_ERROR_HANDLING_STRATEGY
- end.apply!(&block)
- end
+ def with_upsert_error_handling(on_failure, params, &block)
+ if params.any?
+ ErrorHandlingStrategy.new(mode: on_failure, **UPSERT_RETRY_PARAMS.merge(params))
+ else
+ DEFAULT_ERROR_HANDLING_STRATEGY
+ end.apply!(&block)
+ end
- def with_upsert_retry(retry_params, &block)
- retry_params = UPSERT_RETRY_PARAMS.merge((retry_params))
- Retryable.retryable(**retry_params, &block)
- end
+ def with_upsert_retry(retry_params, &block)
+ retry_params = UPSERT_RETRY_PARAMS.merge((retry_params))
+ Retryable.retryable(**retry_params, &block)
+ end
- # A polyfill for Hash#except for ruby 2.x environments without ActiveSupport
- # TODO: delete and use native Hash#except after ruby 2.7 is deprecated.
- def except(hash, *keys)
- return hash.except(*keys) if hash.respond_to?(:except)
+ # A polyfill for Hash#except for ruby 2.x environments without ActiveSupport
+ # TODO: delete and use native Hash#except after ruby 2.7 is deprecated.
+ def except(hash, *keys)
+ return hash.except(*keys) if hash.respond_to?(:except)
- hash.each_with_object({}) do |(k, v), res|
- res[k] = v unless keys.include?(k)
- end
- end
+ hash.each_with_object({}) do |(k, v), res|
+ res[k] = v unless keys.include?(k)
+ end
+ end
- def cleanup_properties(create_properties, update_properties, *props_to_cleanup)
- props_to_cleanup = [T.id, T.label] if props_to_cleanup.empty?
- [create_properties, update_properties].map do |props|
- except(props, props_to_cleanup)
- end
- end
+ def cleanup_properties(create_properties, update_properties, *props_to_cleanup)
+ props_to_cleanup = [T.id, T.label] if props_to_cleanup.empty?
+ [create_properties, update_properties].map do |props|
+ except(props, props_to_cleanup)
end
end
end