lib/ddbcli/ddb-driver.rb in ddbcli-0.1.2 vs lib/ddbcli/ddb-driver.rb in ddbcli-0.1.3
- old
+ new
@@ -21,10 +21,11 @@
end # Rownum
def initialize(accessKeyId, secretAccessKey, endpoint_or_region)
@client = DynamoDB::Client.new(accessKeyId, secretAccessKey, endpoint_or_region)
@consistent = false
+ @iteratable = false
end
def_delegators(
:@client,
:endpoint,
@@ -34,10 +35,11 @@
:retry_num, :'retry_num=',
:retry_intvl, :'retry_intvl=',
:debug, :'debug=')
attr_accessor :consistent
+ attr_accessor :iteratable
def execute(query, opts = {})
parsed, script_type, script = Parser.parse(query)
command = parsed.class.name.split('::').last.to_sym
@@ -84,10 +86,12 @@
if @last_action and @last_parsed and @last_evaluated_key
do_select(@last_action, @last_parsed, :last_evaluated_key => @last_evaluated_key)
else
[]
end
+ when :NULL
+ nil
else
raise 'must not happen'
end
begin
@@ -307,67 +311,84 @@
def do_describe(parsed)
(@client.query('DescribeTable', 'TableName' => parsed.table) || {}).fetch('Table', {})
end
def do_select(action, parsed, opts = {})
- req_hash = {'TableName' => parsed.table}
- req_hash['AttributesToGet'] = parsed.attrs unless parsed.attrs.empty?
- req_hash['Limit'] = parsed.limit if parsed.limit
- req_hash['ExclusiveStartKey'] = opts[:last_evaluated_key] if opts[:last_evaluated_key]
+ select_proc = lambda do |last_evaluated_key|
+ req_hash = {'TableName' => parsed.table}
+ req_hash['AttributesToGet'] = parsed.attrs unless parsed.attrs.empty?
+ req_hash['Limit'] = parsed.limit if parsed.limit
+ req_hash['ExclusiveStartKey'] = last_evaluated_key if last_evaluated_key
- if action == 'Query'
- req_hash['ConsistentRead'] = @consistent if @consistent
- req_hash['IndexName'] = parsed.index if parsed.index
- req_hash['ScanIndexForward'] = parsed.order_asc unless parsed.order_asc.nil?
- end
+ if action == 'Query'
+ req_hash['ConsistentRead'] = @consistent if @consistent
+ req_hash['IndexName'] = parsed.index if parsed.index
+ req_hash['ScanIndexForward'] = parsed.order_asc unless parsed.order_asc.nil?
+ end
- # XXX: req_hash['ReturnConsumedCapacity'] = ...
+ # XXX: req_hash['ReturnConsumedCapacity'] = ...
- if parsed.count
- req_hash['Select'] = 'COUNT'
- elsif not parsed.attrs.empty?
- req_hash['Select'] = 'SPECIFIC_ATTRIBUTES'
- end
+ if parsed.count
+ req_hash['Select'] = 'COUNT'
+ elsif not parsed.attrs.empty?
+ req_hash['Select'] = 'SPECIFIC_ATTRIBUTES'
+ end
- # key conditions / scan filter
- if parsed.conds
- param_name = (action == 'Query') ? 'KeyConditions' : 'ScanFilter'
- req_hash[param_name] = {}
+ # key conditions / scan filter
+ if parsed.conds
+ param_name = (action == 'Query') ? 'KeyConditions' : 'ScanFilter'
+ req_hash[param_name] = {}
- parsed.conds.each do |key, operator, values|
- h = req_hash[param_name][key] = {
- 'ComparisonOperator' => operator.to_s
- }
+ parsed.conds.each do |key, operator, values|
+ h = req_hash[param_name][key] = {
+ 'ComparisonOperator' => operator.to_s
+ }
- h['AttributeValueList'] = values.map do |val|
- convert_to_attribute_value(val)
+ h['AttributeValueList'] = values.map do |val|
+ convert_to_attribute_value(val)
+ end
end
- end
- end # key conditions / scan filter
+ end # key conditions / scan filter
- res_data = nil
+ rd = nil
- begin
- res_data = @client.query(action, req_hash)
- rescue DynamoDB::Error => e
- if action == 'Query' and e.data['__type'] == 'com.amazon.coral.service#InternalFailure' and not (e.data['message'] || e.data['Message'])
- table_info = (@client.query('DescribeTable', 'TableName' => parsed.table) || {}).fetch('Table', {}) rescue {}
+ begin
+ rd = @client.query(action, req_hash)
+ rescue DynamoDB::Error => e
+ if action == 'Query' and e.data['__type'] == 'com.amazon.coral.service#InternalFailure' and not (e.data['message'] || e.data['Message'])
+ table_info = (@client.query('DescribeTable', 'TableName' => parsed.table) || {}).fetch('Table', {}) rescue {}
- unless table_info.fetch('KeySchema', []).any? {|i| i ||= {}; i['KeyType'] == 'RANGE' }
- e.message << 'Query can be performed only on a table with a HASH,RANGE key schema'
+ unless table_info.fetch('KeySchema', []).any? {|i| i ||= {}; i['KeyType'] == 'RANGE' }
+ e.message << 'Query can be performed only on a table with a HASH,RANGE key schema'
+ end
end
+
+ raise e
end
- raise e
+ rd
end
+ res_data = select_proc.call(opts[:last_evaluated_key])
retval = nil
if parsed.count
retval = res_data['Count']
+
+ while res_data['LastEvaluatedKey']
+ res_data = select_proc.call(res_data['LastEvaluatedKey'])
+ retval += res_data['Count']
+ end
else
retval = res_data['Items'].map {|i| convert_to_ruby_value(i) }
+
+ if @iteratable and not parsed.limit
+ while res_data['LastEvaluatedKey']
+ res_data = select_proc.call(res_data['LastEvaluatedKey'])
+ retval.concat(res_data['Items'].map {|i| convert_to_ruby_value(i) })
+ end
+ end
end
if res_data['LastEvaluatedKey']
@last_action = action
@last_parsed = parsed
@@ -504,11 +525,11 @@
'Key' => key_hash,
},
}
end
- @client.query('BatchWriteItem', req_hash)
+ batch_write_item(req_hash)
end
Rownum.new(n)
end
@@ -629,18 +650,27 @@
parsed.attrs.zip(val_list).each do |name, val|
h[name] = convert_to_attribute_value(val)
end
end
- @client.query('BatchWriteItem', req_hash)
+ batch_write_item(req_hash)
n += chunk.length
end
Rownum.new(n)
end
def str_to_num(str)
str =~ /\./ ? str.to_f : str.to_i
+ end
+
+ def batch_write_item(req_hash)
+ res_data = @client.query('BatchWriteItem', req_hash)
+
+ until (res_data['UnprocessedItems'] || {}).empty?
+ req_hash['RequestItems'] = res_data['UnprocessedItems']
+ res_data = @client.query('BatchWriteItem', req_hash)
+ end
end
end # Driver
end # DynamoDB