lib/jets/job/dsl/dynamodb_event.rb in jets-4.0.12 vs lib/jets/job/dsl/dynamodb_event.rb in jets-5.0.0.beta1
- old
+ new
@@ -1,28 +1,38 @@
module Jets::Job::Dsl
module DynamodbEvent
- def dynamodb_event(table_name, options={})
+ def dynamodb_event(table_name_without_namespace, options={})
return if ENV['JETS_BUILD_NO_INTERNET'] # Disable during build since jets build tries to init this
+ table_name = add_dynamodb_table_namespace(table_name_without_namespace)
stream_arn = full_dynamodb_stream_arn(table_name)
default_iam_policy = default_dynamodb_stream_policy(stream_arn)
# Create iam policy allows access to queue
# Allow disabling in case use wants to add permission application-wide and not have extra IAM policy
iam_policy_props = options.delete(:iam_policy) || @iam_policy || default_iam_policy
iam_policy(iam_policy_props) unless iam_policy_props == :disable
props = options # by this time options only has EventSourceMapping properties
default = {
- event_source_arn: stream_arn,
- starting_position: "TRIM_HORIZON",
+ EventSourceArn: stream_arn,
+ StartingPosition: "TRIM_HORIZON",
}
props = default.merge(props)
event_source_mapping(props)
end
+ def add_dynamodb_table_namespace(table_name_without_namespace)
+ ns = if Jets.config.events.dynamodb.table_namespace == true
+ Jets.table_namespace # does not include extra
+ elsif Jets.config.events.dynamodb.table_namespace
+ Jets.config.events.dynamodb.table_namespace # allow user to fully control namespace
+ end
+ ns_separator = Jets.config.events.dynamodb.table_namespace_separator
+ [ns, table_name_without_namespace].compact.join(ns_separator)
+ end
# Expands table name to the full stream arn. Example:
#
# test-table
# To:
@@ -46,21 +56,21 @@
return stream_arn if stream_arn
end
def default_dynamodb_stream_policy(stream_name_arn='*')
stream = {
- action: ["dynamodb:GetRecords",
+ Action: ["dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"],
- effect: "Allow",
- resource: stream_name_arn,
+ Effect: "Allow",
+ Resource: stream_name_arn,
}
table_name_arn = stream_name_arn.gsub(%r{/stream/20.*},'')
table = {
- action: ["dynamodb:DescribeTable"],
- effect: "Allow",
- resource: table_name_arn,
+ Action: ["dynamodb:DescribeTable"],
+ Effect: "Allow",
+ Resource: table_name_arn,
}
[stream, table]
end
end
end
\ No newline at end of file