lib/fluent/plugin/in_dynamodb_streams.rb in fluent-plugin-dynamodb-streams-0.0.2 vs lib/fluent/plugin/in_dynamodb_streams.rb in fluent-plugin-dynamodb-streams-0.0.3

- old
+ new

@@ -42,11 +42,11 @@ super unless @pos_file @pos_memory = {} end - + options = {} options[:region] = @aws_region if @aws_region options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key options[:endpoint] = @stream_endpoint @client = Aws::DynamoDBStreams::Client.new(options) @@ -64,14 +64,11 @@ def run while @running sleep @fetch_interval - @client.describe_stream({ - stream_arn: @stream_arn - }).stream_description.shards.each do |s| - + get_shards.each do |s| if s.sequence_number_range.ending_sequence_number remove_sequence(s.shard_id) next end @@ -96,9 +93,30 @@ else @iterator.delete s.shard_id end end end + end + + def get_shards() + shards = [] + + last_shard_id = nil + begin + s = @client.describe_stream({ + stream_arn: @stream_arn, + exclusive_start_shard_id: last_shard_id, + }).stream_description + + shards = shards + s.shards + + if s.last_evaluated_shard_id == last_shard_id then + break + end + last_shard_id = s.last_evaluated_shard_id + end while last_shard_id + + shards end def set_iterator(shard_id) if load_sequence(shard_id) @iterator[shard_id] = @client.get_shard_iterator({