lib/ddbcli/ddb-driver.rb in ddbcli-0.1.2 vs lib/ddbcli/ddb-driver.rb in ddbcli-0.1.3

- old
+ new

@@ -21,10 +21,11 @@ end # Rownum def initialize(accessKeyId, secretAccessKey, endpoint_or_region) @client = DynamoDB::Client.new(accessKeyId, secretAccessKey, endpoint_or_region) @consistent = false + @iteratable = false end def_delegators( :@client, :endpoint, @@ -34,10 +35,11 @@ :retry_num, :'retry_num=', :retry_intvl, :'retry_intvl=', :debug, :'debug=') attr_accessor :consistent + attr_accessor :iteratable def execute(query, opts = {}) parsed, script_type, script = Parser.parse(query) command = parsed.class.name.split('::').last.to_sym @@ -84,10 +86,12 @@ if @last_action and @last_parsed and @last_evaluated_key do_select(@last_action, @last_parsed, :last_evaluated_key => @last_evaluated_key) else [] end + when :NULL + nil else raise 'must not happen' end begin @@ -307,67 +311,84 @@ def do_describe(parsed) (@client.query('DescribeTable', 'TableName' => parsed.table) || {}).fetch('Table', {}) end def do_select(action, parsed, opts = {}) - req_hash = {'TableName' => parsed.table} - req_hash['AttributesToGet'] = parsed.attrs unless parsed.attrs.empty? - req_hash['Limit'] = parsed.limit if parsed.limit - req_hash['ExclusiveStartKey'] = opts[:last_evaluated_key] if opts[:last_evaluated_key] + select_proc = lambda do |last_evaluated_key| + req_hash = {'TableName' => parsed.table} + req_hash['AttributesToGet'] = parsed.attrs unless parsed.attrs.empty? + req_hash['Limit'] = parsed.limit if parsed.limit + req_hash['ExclusiveStartKey'] = last_evaluated_key if last_evaluated_key - if action == 'Query' - req_hash['ConsistentRead'] = @consistent if @consistent - req_hash['IndexName'] = parsed.index if parsed.index - req_hash['ScanIndexForward'] = parsed.order_asc unless parsed.order_asc.nil? - end + if action == 'Query' + req_hash['ConsistentRead'] = @consistent if @consistent + req_hash['IndexName'] = parsed.index if parsed.index + req_hash['ScanIndexForward'] = parsed.order_asc unless parsed.order_asc.nil? + end - # XXX: req_hash['ReturnConsumedCapacity'] = ... + # XXX: req_hash['ReturnConsumedCapacity'] = ... - if parsed.count - req_hash['Select'] = 'COUNT' - elsif not parsed.attrs.empty? - req_hash['Select'] = 'SPECIFIC_ATTRIBUTES' - end + if parsed.count + req_hash['Select'] = 'COUNT' + elsif not parsed.attrs.empty? + req_hash['Select'] = 'SPECIFIC_ATTRIBUTES' + end - # key conditions / scan filter - if parsed.conds - param_name = (action == 'Query') ? 'KeyConditions' : 'ScanFilter' - req_hash[param_name] = {} + # key conditions / scan filter + if parsed.conds + param_name = (action == 'Query') ? 'KeyConditions' : 'ScanFilter' + req_hash[param_name] = {} - parsed.conds.each do |key, operator, values| - h = req_hash[param_name][key] = { - 'ComparisonOperator' => operator.to_s - } + parsed.conds.each do |key, operator, values| + h = req_hash[param_name][key] = { + 'ComparisonOperator' => operator.to_s + } - h['AttributeValueList'] = values.map do |val| - convert_to_attribute_value(val) + h['AttributeValueList'] = values.map do |val| + convert_to_attribute_value(val) + end end - end - end # key conditions / scan filter + end # key conditions / scan filter - res_data = nil + rd = nil - begin - res_data = @client.query(action, req_hash) - rescue DynamoDB::Error => e - if action == 'Query' and e.data['__type'] == 'com.amazon.coral.service#InternalFailure' and not (e.data['message'] || e.data['Message']) - table_info = (@client.query('DescribeTable', 'TableName' => parsed.table) || {}).fetch('Table', {}) rescue {} + begin + rd = @client.query(action, req_hash) + rescue DynamoDB::Error => e + if action == 'Query' and e.data['__type'] == 'com.amazon.coral.service#InternalFailure' and not (e.data['message'] || e.data['Message']) + table_info = (@client.query('DescribeTable', 'TableName' => parsed.table) || {}).fetch('Table', {}) rescue {} - unless table_info.fetch('KeySchema', []).any? {|i| i ||= {}; i['KeyType'] == 'RANGE' } - e.message << 'Query can be performed only on a table with a HASH,RANGE key schema' + unless table_info.fetch('KeySchema', []).any? {|i| i ||= {}; i['KeyType'] == 'RANGE' } + e.message << 'Query can be performed only on a table with a HASH,RANGE key schema' + end end + + raise e end - raise e + rd end + res_data = select_proc.call(opts[:last_evaluated_key]) retval = nil if parsed.count retval = res_data['Count'] + + while res_data['LastEvaluatedKey'] + res_data = select_proc.call(res_data['LastEvaluatedKey']) + retval += res_data['Count'] + end else retval = res_data['Items'].map {|i| convert_to_ruby_value(i) } + + if @iteratable and not parsed.limit + while res_data['LastEvaluatedKey'] + res_data = select_proc.call(res_data['LastEvaluatedKey']) + retval.concat(res_data['Items'].map {|i| convert_to_ruby_value(i) }) + end + end end if res_data['LastEvaluatedKey'] @last_action = action @last_parsed = parsed @@ -504,11 +525,11 @@ 'Key' => key_hash, }, } end - @client.query('BatchWriteItem', req_hash) + batch_write_item(req_hash) end Rownum.new(n) end @@ -629,18 +650,27 @@ parsed.attrs.zip(val_list).each do |name, val| h[name] = convert_to_attribute_value(val) end end - @client.query('BatchWriteItem', req_hash) + batch_write_item(req_hash) n += chunk.length end Rownum.new(n) end def str_to_num(str) str =~ /\./ ? str.to_f : str.to_i + end + + def batch_write_item(req_hash) + res_data = @client.query('BatchWriteItem', req_hash) + + until (res_data['UnprocessedItems'] || {}).empty? + req_hash['RequestItems'] = res_data['UnprocessedItems'] + res_data = @client.query('BatchWriteItem', req_hash) + end end end # Driver end # DynamoDB