Sha256: a88c587a3b6083bedd9b3d0a673ffe0457bd2c8f84b802c8062a731e228ac75c
Contents?: true
Size: 1.24 KB
Versions: 2
Compression:
Stored size: 1.24 KB
Contents
module Smash module CloudPowers module Synapse module Pipe class Stream < Smash::CloudPowers::Resource attr_accessor :kinesis, :shard_count def initialize(name:, client: kinesis, **config) super @kinesis = client @shard_count = config[:shard_count] || 1 end def create_resource begin @response = kinesis.create_stream(config) kinesis.wait_until(:stream_exists, stream_name: config[:stream_name]) @response.successful? # (http request successful && stream created)? rescue Exception => e if e.kind_of? Aws::Kinesis::Errors::ResourceInUseException logger.info "#{name} already created" return if stream_status == 'ACTIVE' logger.info "Not ready for traffic. Wait for 30 seconds..." sleep 1 @saved = true # acts like it would if it had to create the stream @linked = true else raise end end end def config { stream_name: @name, shard_count: @shard_count } end end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
cloud_powers-1.0.1 | lib/cloud_powers/synapse/pipe/stream.rb |
cloud_powers-1.0.0 | lib/cloud_powers/synapse/pipe/stream.rb |