# frozen_string_literal: true require_relative 'key_fields_detector' require_relative 'nonexistent_fields_detector' require_relative 'where_conditions' module Dynamoid module Criteria # The criteria chain is equivalent to an ActiveRecord relation (and realistically I should change the name from # chain to relation). It is a chainable object that builds up a query and eventually executes it by a Query or Scan. class Chain attr_reader :source, :consistent_read, :key_fields_detector include Enumerable ALLOWED_FIELD_OPERATORS = Set.new( %w[ eq ne gt lt gte lte between begins_with in contains not_contains null not_null ] ).freeze # Create a new criteria chain. # # @param [Class] source the class upon which the ultimate query will be performed. def initialize(source) @where_conditions = WhereConditions.new @source = source @consistent_read = false @scan_index_forward = true # we should re-initialize keys detector every time we change @where_conditions @key_fields_detector = KeyFieldsDetector.new(@where_conditions, @source) end # Returns a chain which is a result of filtering current chain with the specified conditions. # # It accepts conditions in the form of a hash. # # Post.where(links_count: 2) # # A key could be either string or symbol. # # In order to express conditions other than equality predicates could be used. # Predicate should be added to an attribute name to form a key +'created_at.gt' => Date.yesterday+ # # Currently supported following predicates: # - +gt+ - greater than # - +gte+ - greater or equal # - +lt+ - less than # - +lte+ - less or equal # - +ne+ - not equal # - +between+ - an attribute value is greater than the first value and less than the second value # - +in+ - check an attribute in a list of values # - +begins_with+ - check for a prefix in string # - +contains+ - check substring or value in a set or array # - +not_contains+ - check for absence of substring or a value in set or array # - +null+ - attribute doesn't exists in an item # - +not_null+ - attribute exists in an item # # All the predicates match operators supported by DynamoDB's # {ComparisonOperator}[https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Condition.html#DDB-Type-Condition-ComparisonOperator] # # Post.where('size.gt' => 1000) # Post.where('size.gte' => 1000) # Post.where('size.lt' => 35000) # Post.where('size.lte' => 35000) # Post.where('author.ne' => 'John Doe') # Post.where('created_at.between' => [Time.now - 3600, Time.now]) # Post.where('category.in' => ['tech', 'fashion']) # Post.where('title.begins_with' => 'How long') # Post.where('tags.contains' => 'Ruby') # Post.where('tags.not_contains' => 'Ruby on Rails') # Post.where('legacy_attribute.null' => true) # Post.where('optional_attribute.not_null' => true) # # There are some limitations for a sort key. Only following predicates # are supported - +gt+, +gte+, +lt+, +lte+, +between+, +begins_with+. # # +where+ without argument will return the current chain. # # Multiple calls can be chained together and conditions will be merged: # # Post.where('size.gt' => 1000).where('title' => 'some title') # # It's equivalent to: # # Post.where('size.gt' => 1000, 'title' => 'some title') # # But only one condition can be specified for a certain attribute. The # last specified condition will override all the others. Only condition # 'size.lt' => 200 will be used in following examples: # # Post.where('size.gt' => 100, 'size.lt' => 200) # Post.where('size.gt' => 100).where('size.lt' => 200) # # Internally +where+ performs either +Scan+ or +Query+ operation. # # @return [Dynamoid::Criteria::Chain] # @since 0.2.0 def where(args) detector = NonexistentFieldsDetector.new(args, @source) if detector.found? Dynamoid.logger.warn(detector.warning_message) end @where_conditions.update(args.symbolize_keys) # we should re-initialize keys detector every time we change @where_conditions @key_fields_detector = KeyFieldsDetector.new(@where_conditions, @source, forced_index_name: @forced_index_name) self end # Turns on strongly consistent reads. # # By default reads are eventually consistent. # # Post.where('size.gt' => 1000).consistent # # @return [Dynamoid::Criteria::Chain] def consistent @consistent_read = true self end # Returns all the records matching the criteria. # # Since +where+ and most of the other methods return a +Chain+ # the only way to get a result as a collection is to call the +all+ # method. It returns +Enumerator+ which could be used directly or # transformed into +Array+ # # Post.all # => Enumerator # Post.where(links_count: 2).all # => Enumerator # Post.where(links_count: 2).all.to_a # => Array # # When the result set is too large DynamoDB divides it into separate # pages. While an enumerator iterates over the result models each page # is loaded lazily. So even an extra large result set can be loaded and # processed with considerably small memory footprint and throughput # consumption. # # @return [Enumerator::Lazy] # @since 0.2.0 def all records end # Returns the actual number of items in a table matching the criteria. # # Post.where(links_count: 2).count # # Internally it uses either `Scan` or `Query` DynamoDB's operation so it # costs like all the matching items were read from a table. # # The only difference is that items are read by DynemoDB but not actually # loaded on the client side. DynamoDB returns only count of items after # filtering. # # @return [Integer] def count if @key_fields_detector.key_present? count_via_query else count_via_scan end end # Returns the first item matching the criteria. # # Post.where(links_count: 2).first # # Applies `record_limit(1)` to ensure only a single record is fetched # when no non-key conditions are present and `scan_limit(1)` when no # conditions are present at all. # # If used without criteria it just returns the first item of some # arbitrary order. # # Post.first # # @return [Model|nil] def first(*args) n = args.first || 1 return dup.scan_limit(n).to_a.first(*args) if @where_conditions.empty? return super if @key_fields_detector.non_key_present? dup.record_limit(n).to_a.first(*args) end # Returns the last item matching the criteria. # # Post.where(links_count: 2).last # # DynamoDB doesn't support ordering by some arbitrary attribute except a # sort key. So this method is mostly useful during development and # testing. # # If used without criteria it just returns the last item of some arbitrary order. # # Post.last # # It isn't efficient from the performance point of view as far as it reads and # loads all the filtered items from DynamoDB. # # @return [Model|nil] def last all.to_a.last end # Deletes all the items matching the criteria. # # Post.where(links_count: 2).delete_all # # If called without criteria then it deletes all the items in a table. # # Post.delete_all # # It loads all the items either with +Scan+ or +Query+ operation and # deletes them in batch with +BatchWriteItem+ operation. +BatchWriteItem+ # is limited by request size and items count so it's quite possible the # deletion will require several +BatchWriteItem+ calls. def delete_all ids = [] ranges = [] if @key_fields_detector.key_present? Dynamoid.adapter.query(source.table_name, query_key_conditions, query_non_key_conditions, query_options).flat_map { |i| i }.collect do |hash| ids << hash[source.hash_key.to_sym] ranges << hash[source.range_key.to_sym] if source.range_key end else Dynamoid.adapter.scan(source.table_name, scan_conditions, scan_options).flat_map { |i| i }.collect do |hash| ids << hash[source.hash_key.to_sym] ranges << hash[source.range_key.to_sym] if source.range_key end end Dynamoid.adapter.delete(source.table_name, ids, range_key: ranges.presence) end alias destroy_all delete_all # Set the record limit. # # The record limit is the limit of evaluated items returned by the # +Query+ or +Scan+. In other words it's how many items should be # returned in response. # # Post.where(links_count: 2).record_limit(1000) # => 1000 models # Post.record_limit(1000) # => 1000 models # # It could be very inefficient in terms of HTTP requests in pathological # cases. DynamoDB doesn't support out of the box the limits for items # count after filtering. So it's possible to make a lot of HTTP requests # to find items matching criteria and skip not matching. It means that # the cost (read capacity units) is unpredictable. # # Because of such issues with performance and cost it's mostly useful in # development and testing. # # When called without criteria it works like +scan_limit+. # # @return [Dynamoid::Criteria::Chain] def record_limit(limit) @record_limit = limit self end # Set the scan limit. # # The scan limit is the limit of records that DynamoDB will internally # read with +Query+ or +Scan+. It's different from the record limit as # with filtering DynamoDB may look at N scanned items but return 0 # items if none passes the filter. So it can return less items than was # specified with the limit. # # Post.where(links_count: 2).scan_limit(1000) # => 850 models # Post.scan_limit(1000) # => 1000 models # # By contrast with +record_limit+ the cost (read capacity units) and # performance is predictable. # # When called without criteria it works like +record_limit+. # # @return [Dynamoid::Criteria::Chain] def scan_limit(limit) @scan_limit = limit self end # Set the batch size. # # The batch size is a number of items which will be lazily loaded one by one. # When the batch size is set then items will be loaded batch by batch of # the specified size instead of relying on the default paging mechanism # of DynamoDB. # # Post.where(links_count: 2).batch(1000).all.each do |post| # # process a post # end # # It's useful to limit memory usage or throughput consumption # # @return [Dynamoid::Criteria::Chain] def batch(batch_size) @batch_size = batch_size self end # Set the start item. # # When the start item is set the items will be loaded starting right # after the specified item. # # Post.where(links_count: 2).start(post) # # It can be used to implement an own pagination mechanism. # # Post.where(author_id: author_id).start(last_post).scan_limit(50) # # The specified start item will not be returned back in a result set. # # Actually it doesn't need all the item attributes to start - an item may # have only the primary key attributes (partition and sort key if it's # declared). # # Post.where(links_count: 2).start(Post.new(id: id)) # # It also supports a +Hash+ argument with the keys attributes - a # partition key and a sort key (if it's declared). # # Post.where(links_count: 2).start(id: id) # # @return [Dynamoid::Criteria::Chain] def start(start) @start = start self end # Reverse the sort order. # # By default the sort order is ascending (by the sort key value). Set a # +false+ value to reverse the order. # # Post.where(id: id, 'views_count.gt' => 1000).scan_index_forward(false) # # It works only for queries with a partition key condition e.g. +id: # 'some-id'+ which internally performs +Query+ operation. # # @return [Dynamoid::Criteria::Chain] def scan_index_forward(scan_index_forward) @scan_index_forward = scan_index_forward self end # Force the index name to use for queries. # # By default allows the library to select the most appropriate index. # Sometimes you have more than one index which will fulfill your query's # needs. When this case occurs you may want to force an order. This occurs # when you are searching by hash key, but not specifying a range key. # # class Comment # include Dynamoid::Document # # table key: :post_id # range_key :author_id # # field :post_date, :datetime # # global_secondary_index name: :time_sorted_comments, hash_key: :post_id, range_key: post_date, projected_attributes: :all # end # # # Comment.where(post_id: id).with_index(:time_sorted_comments).scan_index_forward(false) # # @return [Dynamoid::Criteria::Chain] def with_index(index_name) raise Dynamoid::Errors::InvalidIndex, "Unknown index #{index_name}" unless @source.find_index_by_name(index_name) @forced_index_name = index_name @key_fields_detector = KeyFieldsDetector.new(@where_conditions, @source, forced_index_name: index_name) self end # Allows to use the results of a search as an enumerable over the results # found. # # Post.each do |post| # end # # Post.all.each do |post| # end # # Post.where(links_count: 2).each do |post| # end # # It works similar to the +all+ method so results are loaded lazily. # # @since 0.2.0 def each(&block) records.each(&block) end # Iterates over the pages returned by DynamoDB. # # DynamoDB has its own paging machanism and divides a large result set # into separate pages. The +find_by_pages+ method provides access to # these native DynamoDB pages. # # The pages are loaded lazily. # # Post.where('views_count.gt' => 1000).find_by_pages do |posts, options| # # process posts # end # # It passes as block argument an +Array+ of models and a Hash with options. # # Options +Hash+ contains only one option +:last_evaluated_key+. The last # evaluated key is a Hash with key attributes of the last item processed by # DynamoDB. It can be used to resume querying using the +start+ method. # # posts, options = Post.where('views_count.gt' => 1000).find_by_pages.first # last_key = options[:last_evaluated_key] # # # ... # # Post.where('views_count.gt' => 1000).start(last_key).find_by_pages do |posts, options| # end # # If it's called without a block then it returns an +Enumerator+. # # enum = Post.where('views_count.gt' => 1000).find_by_pages # # enum.each do |posts, options| # # process posts # end # # @return [Enumerator::Lazy] def find_by_pages(&block) pages.each(&block) end # Select only specified fields. # # It takes one or more field names and returns a collection of models with only # these fields set. # # Post.where('views_count.gt' => 1000).project(:title) # Post.where('views_count.gt' => 1000).project(:title, :created_at) # Post.project(:id) # # It can be used to avoid loading large field values and to decrease a # memory footprint. # # @return [Dynamoid::Criteria::Chain] def project(*fields) @project = fields.map(&:to_sym) self end # Select only specified fields. # # It takes one or more field names and returns an array of either values # or arrays of values. # # Post.pluck(:id) # => ['1', '2'] # Post.pluck(:title, :title) # => [['1', 'Title #1'], ['2', 'Title#2']] # # Post.where('views_count.gt' => 1000).pluck(:title) # # There are some differences between +pluck+ and +project+. +pluck+ # - doesn't instantiate models # - it isn't chainable and returns +Array+ instead of +Chain+ # # It deserializes values if a field type isn't supported by DynamoDB natively. # # It can be used to avoid loading large field values and to decrease a # memory footprint. # # @return [Array] def pluck(*args) fields = args.map(&:to_sym) # `project` has a side effect - it sets `@project` instance variable. # So use a duplicate to not pollute original chain. scope = dup scope.project(*fields) if fields.many? scope.items.map do |item| fields.map { |key| Undumping.undump_field(item[key], source.attributes[key]) } end.to_a else key = fields.first scope.items.map { |item| Undumping.undump_field(item[key], source.attributes[key]) }.to_a end end private # The actual records referenced by the association. # # @return [Enumerator] an iterator of the found records. # # @since 0.2.0 def records pages.lazy.flat_map { |items, _| items } end # Raw items like they are stored before type casting def items raw_pages.lazy.flat_map { |items, _| items } end protected :items # Arrays of records, sized based on the actual pages produced by DynamoDB # # @return [Enumerator] an iterator of the found records. # # @since 3.1.0 def pages raw_pages.lazy.map do |items, options| models = items.map { |i| source.from_database(i) } models.each { |m| m.run_callbacks :find } [models, options] end.each end # Pages of items before type casting def raw_pages if @key_fields_detector.key_present? raw_pages_via_query else issue_scan_warning if Dynamoid::Config.warn_on_scan && !@where_conditions.empty? raw_pages_via_scan end end # If the query matches an index, we'll query the associated table to find results. # # @return [Enumerator] an iterator of the found pages. An array of records # # @since 3.1.0 def raw_pages_via_query Enumerator.new do |y| Dynamoid.adapter.query(source.table_name, query_key_conditions, query_non_key_conditions, query_options).each do |items, metadata| options = metadata.slice(:last_evaluated_key) y.yield items, options end end end # If the query does not match an index, we'll manually scan the associated table to find results. # # @return [Enumerator] an iterator of the found pages. An array of records # # @since 3.1.0 def raw_pages_via_scan Enumerator.new do |y| Dynamoid.adapter.scan(source.table_name, scan_conditions, scan_options).each do |items, metadata| options = metadata.slice(:last_evaluated_key) y.yield items, options end end end def issue_scan_warning Dynamoid.logger.warn 'Queries without an index are forced to use scan and are generally much slower than indexed queries!' Dynamoid.logger.warn "You can index this query by adding index declaration to #{source.to_s.underscore}.rb:" Dynamoid.logger.warn "* global_secondary_index hash_key: 'some-name', range_key: 'some-another-name'" Dynamoid.logger.warn "* local_secondary_index range_key: 'some-name'" Dynamoid.logger.warn "Not indexed attributes: #{@where_conditions.keys.sort.collect { |name| ":#{name}" }.join(', ')}" end def count_via_query Dynamoid.adapter.query_count(source.table_name, query_key_conditions, query_non_key_conditions, query_options) end def count_via_scan Dynamoid.adapter.scan_count(source.table_name, scan_conditions, scan_options) end def field_condition(key, value_before_type_casting) name, operator = key.to_s.split('.') value = type_cast_condition_parameter(name, value_before_type_casting) operator ||= 'eq' unless operator.in? ALLOWED_FIELD_OPERATORS raise Dynamoid::Errors::Error, "Unsupported operator #{operator} in #{key}" end condition = case operator # NULL/NOT_NULL operators don't have parameters # So { null: true } means NULL check and { null: false } means NOT_NULL one # The same logic is used for { not_null: BOOL } when 'null' value ? [:null, nil] : [:not_null, nil] when 'not_null' value ? [:not_null, nil] : [:null, nil] else [operator.to_sym, value] end [name.to_sym, condition] end def query_key_conditions opts = {} # Add hash key # TODO: always have hash key in @where_conditions? _, condition = field_condition(@key_fields_detector.hash_key, @where_conditions[@key_fields_detector.hash_key]) opts[@key_fields_detector.hash_key] = [condition] # Add range key if @key_fields_detector.range_key if @where_conditions[@key_fields_detector.range_key].present? _, condition = field_condition(@key_fields_detector.range_key, @where_conditions[@key_fields_detector.range_key]) opts[@key_fields_detector.range_key] = [condition] end @where_conditions.keys.select { |k| k.to_s =~ /^#{@key_fields_detector.range_key}\./ }.each do |key| name, condition = field_condition(key, @where_conditions[key]) opts[name] ||= [] opts[name] << condition end end opts end def query_non_key_conditions opts = {} # Honor STI and :type field if it presents if @source.attributes.key?(@source.inheritance_field) && @key_fields_detector.hash_key.to_sym != @source.inheritance_field.to_sym @where_conditions.update(sti_condition) end # TODO: Separate key conditions and non-key conditions properly: # only =, >, >=, <, <=, between and begins_with # could be used for sort key in KeyConditionExpression keys = (@where_conditions.keys.map(&:to_sym) - [@key_fields_detector.hash_key.to_sym, @key_fields_detector.range_key.try(:to_sym)]) .reject { |k, _| k.to_s =~ /^#{@key_fields_detector.range_key}\./ } keys.each do |key| name, condition = field_condition(key, @where_conditions[key]) opts[name] ||= [] opts[name] << condition end opts end # TODO: casting should be operator aware # e.g. for NULL operator value should be boolean # and isn't related to an attribute own type def type_cast_condition_parameter(key, value) return value if %i[array set].include?(source.attributes[key.to_sym][:type]) if [true, false].include?(value) # Support argument for null/not_null operators value elsif !value.respond_to?(:to_ary) options = source.attributes[key.to_sym] value_casted = TypeCasting.cast_field(value, options) Dumping.dump_field(value_casted, options) else value.to_ary.map do |el| options = source.attributes[key.to_sym] value_casted = TypeCasting.cast_field(el, options) Dumping.dump_field(value_casted, options) end end end # Start key needs to be set up based on the index utilized # If using a secondary index then we must include the index's composite key # as well as the tables composite key. def start_key return @start if @start.is_a?(Hash) hash_key = @key_fields_detector.hash_key || source.hash_key range_key = @key_fields_detector.range_key || source.range_key key = {} key[hash_key] = type_cast_condition_parameter(hash_key, @start.send(hash_key)) if range_key key[range_key] = type_cast_condition_parameter(range_key, @start.send(range_key)) end # Add table composite keys if they differ from secondary index used composite key if hash_key != source.hash_key key[source.hash_key] = type_cast_condition_parameter(source.hash_key, @start.hash_key) end if source.range_key && range_key != source.range_key key[source.range_key] = type_cast_condition_parameter(source.range_key, @start.range_value) end key end def query_options opts = {} # Don't specify select = ALL_ATTRIBUTES option explicitly because it's # already a default value of Select statement. Explicite Select value # conflicts with AttributesToGet statement (project option). opts[:index_name] = @key_fields_detector.index_name if @key_fields_detector.index_name opts[:record_limit] = @record_limit if @record_limit opts[:scan_limit] = @scan_limit if @scan_limit opts[:batch_size] = @batch_size if @batch_size opts[:exclusive_start_key] = start_key if @start opts[:scan_index_forward] = @scan_index_forward opts[:project] = @project opts[:consistent_read] = true if @consistent_read opts end def scan_conditions # Honor STI and :type field if it presents if sti_condition @where_conditions.update(sti_condition) end {}.tap do |opts| @where_conditions.keys.map(&:to_sym).each do |key| name, condition = field_condition(key, @where_conditions[key]) opts[name] ||= [] opts[name] << condition end end end def scan_options opts = {} opts[:index_name] = @key_fields_detector.index_name if @key_fields_detector.index_name opts[:record_limit] = @record_limit if @record_limit opts[:scan_limit] = @scan_limit if @scan_limit opts[:batch_size] = @batch_size if @batch_size opts[:exclusive_start_key] = start_key if @start opts[:consistent_read] = true if @consistent_read opts[:project] = @project opts end # TODO: return Array, not String def sti_condition condition = {} type = @source.inheritance_field if @source.attributes.key?(type) && !@source.abstract_class? sti_names = @source.deep_subclasses.map(&:sti_name) << @source.sti_name condition[:"#{type}.in"] = sti_names end condition end end end end