lib/rom/dynamo/relation.rb in rom-dynamo-0.1.4 vs lib/rom/dynamo/relation.rb in rom-dynamo-0.14.0
- old
+ new
@@ -1,87 +1,111 @@
module Rom
module Dynamo
class Relation < ROM::Relation
include Enumerable
- forward :restrict, :index_restrict
-
- def insert(*args)
- dataset.insert(*args)
- self
- end
-
- def delete(*args)
- dataset.delete(*args)
- self
- end
+ forward :restrict, :batch_restrict, :index_restrict
+ forward :limit, :reversed
+ adapter :dynamo
end
class Dataset
- include Equalizer.new(:name, :connection)
- attr_reader :name, :connection
+ include Enumerable
+ include Dry::Equalizer(:name, :connection)
+ extend Dry::Initializer[undefined: false]
+ EmptyQuery = { key_conditions: {}.freeze }.freeze
+
+ option :connection
+ option :name, proc(&:to_s)
+ option :table_keys, optional: true, reader: false
+ option :query, default: proc { EmptyQuery }, reader: false
alias_method :ddb, :connection
- def initialize(name, ddb, conditions = nil)
- @name, @connection = name, ddb
- @conditions = conditions || {}
- end
-
############# READ #############
+
def each(&block)
- each_item({
- consistent_read: true,
- key_conditions: @conditions
- }, &block)
+ block.nil? ? to_enum : begin
+ result = start_query(consistent_read: true)
+ result.each_page { |p| p[:items].each(&block) }
+ end
end
def restrict(query = nil)
return self if query.nil?
- conds = query_to_conditions(query)
- conds = @conditions.merge(conds)
- dup_as(Dataset, conditions: conds)
+ dup_with_query(Dataset, query)
end
+ def batch_restrict(keys)
+ dup_as(BatchGetDataset, keys: keys.map do |k|
+ Hash[table_keys.zip(k.is_a?(Array) ? k : [k])]
+ end)
+ end
+
def index_restrict(index, query)
- conds = query_to_conditions(query)
- conds = @conditions.merge(conds)
- dup_as(GlobalIndexDataset, index: index, conditions: conds)
+ dup_with_query(GlobalIndexDataset, query, index_name: index.to_s)
end
+ ############# PAGINATION #############
+
+ def limit(limit)
+ dup_with_query(self.class, nil, limit: limit.to_i)
+ end
+
+ def reversed
+ dup_with_query(self.class, nil, scan_index_forward: false)
+ end
+
############# WRITE #############
def insert(hash)
- connection.put_item({
- table_name: name,
- item: hash
- })
+ opts = { table_name: name, item: stringify_keys(hash) }
+ connection.put_item(opts).attributes
end
def delete(hash)
+ hash = stringify_keys(hash)
connection.delete_item({
table_name: name,
key: hash_to_key(hash),
expected: to_expected(hash),
- })
+ }).attributes
end
+ def update(keys, hash)
+ connection.update_item({
+ table_name: name, key: hash_to_key(stringify_keys(keys)),
+ attribute_updates: hash.each_with_object({}) do |(k, v), out|
+ out[k] = { value: dump_value(v), action: 'PUT' } if !keys[k]
+ end
+ }).attributes
+ end
+
############# HELPERS #############
private
- def each_item(options, &block)
- puts "Querying #{name} ...\nWith: #{options.inspect}"
- connection.query(options.merge({
- table_name: name
- })).each_page do |page|
- page[:items].each(&block)
+ def batch_get_each_item(keys, &block)
+ !keys.empty? && ddb.batch_get_item({
+ request_items: { name => { keys: keys } },
+ }).each_page do |page|
+ out = page[:responses][name]
+ out.each(&block)
end
end
- def query_to_conditions(query)
- Hash[query.map do |key, value|
- [key, {
- attribute_value_list: [value],
- comparison_operator: "EQ"
- }]
- end]
+ def dup_with_query(klass, key_hash, opts = {})
+ opts = @query.merge(opts)
+
+ if key_hash && !key_hash.empty?
+ conditions = @query[:key_conditions]
+ opts[:key_conditions] = conditions.merge(Hash[
+ key_hash.map do |key, value|
+ [key, {
+ attribute_value_list: [value],
+ comparison_operator: "EQ"
+ }]
+ end
+ ]).freeze
+ end
+
+ dup_as(klass, query: opts.freeze)
end
def to_expected(hash)
hash && Hash[hash.map do |k, v|
[k, { value: v }]
@@ -94,47 +118,65 @@
end
end
def table_keys
@table_keys ||= begin
- resp = ddb.describe_table(table_name: name)
- keys = resp.first[:table][:key_schema]
- keys.map(&:attribute_name)
+ r = ddb.describe_table(table_name: name)
+ r[:table][:key_schema].map(&:attribute_name)
end
end
+ def start_query(opts = {}, &block)
+ opts = @query.merge(table_name: name).merge!(opts)
+ puts "Querying DDB: #{opts.inspect}"
+ ddb.query(opts)
+ end
+
def dup_as(klass, opts = {})
table_keys # To populate keys once at top-level Dataset
- vars = [:@name, :@connection, :@conditions, :@table_keys]
- klass.allocate.tap do |out|
- vars.each { |k| out.instance_variable_set(k, instance_variable_get(k)) }
- opts.each { |k, v| out.instance_variable_set("@#{k}", v) }
- end
+ attrs = Dataset.dry_initializer.attributes(self)
+ klass.new(attrs.merge(opts))
end
+
+ # String modifiers
+ def stringify_keys(hash)
+ hash.each_with_object({}) { |(k, v), out| out[k.to_s] = v }
+ end
+
+ def dump_value(v)
+ return v.new_offset(0).iso8601(6) if v.is_a?(DateTime)
+ v.is_a?(Time) ? v.utc.iso8601(6) : v
+ end
end
- # Dataset queried via a Global Index
- class GlobalIndexDataset < Dataset
- attr_accessor :index
+ # Batch get using an array of key queries
+ # [{ key => val }, { key => val }, ...]
+ class BatchGetDataset < Dataset
+ option :keys
+ # Query for records
def each(&block)
- # Pull record IDs from Global Index
- keys = []; each_item({
- key_conditions: @conditions,
- index_name: @index
- }) { |hash| keys << hash_to_key(hash) }
+ batch_get_each_item(@keys, &block)
+ end
+ end
- # Bail if we have nothing
- return if keys.empty?
-
- # Query for the actual records
- ddb.batch_get_item({
- request_items: { name => { keys: keys } },
- }).each_page do |page|
- out = page[:responses][name]
- out.each(&block)
+ # Dataset queried via a Global Secondary Index
+ # Paginate through keys from Global Index and
+ # call BatchGetItem for keys from each page
+ class GlobalIndexDataset < Dataset
+ def each(&block)
+ if @query[:limit]
+ each_item(start_query, &block)
+ else
+ result = start_query(limit: 100)
+ result.each_page { |p| each_item(p, &block) }
end
end
+ private def each_item(result, &block)
+ keys = result[:items].map { |h| hash_to_key(h) }
+ batch_get_each_item(keys, &block)
+ end
end
+
end
end