lib/fluent/plugin/in_dynamodb_streams.rb in fluent-plugin-dynamodb-streams-0.0.2 vs lib/fluent/plugin/in_dynamodb_streams.rb in fluent-plugin-dynamodb-streams-0.0.3
- old
+ new
@@ -42,11 +42,11 @@
super
unless @pos_file
@pos_memory = {}
end
-
+
options = {}
options[:region] = @aws_region if @aws_region
options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
options[:endpoint] = @stream_endpoint
@client = Aws::DynamoDBStreams::Client.new(options)
@@ -64,14 +64,11 @@
def run
while @running
sleep @fetch_interval
- @client.describe_stream({
- stream_arn: @stream_arn
- }).stream_description.shards.each do |s|
-
+ get_shards.each do |s|
if s.sequence_number_range.ending_sequence_number
remove_sequence(s.shard_id)
next
end
@@ -96,9 +93,30 @@
else
@iterator.delete s.shard_id
end
end
end
+ end
+
+ def get_shards()
+ shards = []
+
+ last_shard_id = nil
+ begin
+ s = @client.describe_stream({
+ stream_arn: @stream_arn,
+ exclusive_start_shard_id: last_shard_id,
+ }).stream_description
+
+ shards = shards + s.shards
+
+ if s.last_evaluated_shard_id == last_shard_id then
+ break
+ end
+ last_shard_id = s.last_evaluated_shard_id
+ end while last_shard_id
+
+ shards
end
def set_iterator(shard_id)
if load_sequence(shard_id)
@iterator[shard_id] = @client.get_shard_iterator({