Sha256: a88c587a3b6083bedd9b3d0a673ffe0457bd2c8f84b802c8062a731e228ac75c

Contents?: true

Size: 1.24 KB

Versions: 2

Compression:

Stored size: 1.24 KB

Contents

module Smash
  module CloudPowers
    module Synapse
      module Pipe
        class Stream < Smash::CloudPowers::Resource

          attr_accessor :kinesis, :shard_count

          def initialize(name:, client: kinesis, **config)
            super
            @kinesis = client
            @shard_count = config[:shard_count] || 1
          end

          def create_resource
            begin
              @response = kinesis.create_stream(config)
              kinesis.wait_until(:stream_exists, stream_name: config[:stream_name])
              @response.successful? # (http request successful && stream created)?
            rescue Exception => e
              if e.kind_of? Aws::Kinesis::Errors::ResourceInUseException
                logger.info "#{name} already created"
                return if stream_status == 'ACTIVE'
                logger.info "Not ready for traffic.  Wait for 30 seconds..."
                sleep 1
                @saved = true # acts like it would if it had to create the stream
                @linked = true
              else
                raise
              end
            end
          end

          def config
            { stream_name: @name, shard_count: @shard_count }
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
cloud_powers-1.0.1 lib/cloud_powers/synapse/pipe/stream.rb
cloud_powers-1.0.0 lib/cloud_powers/synapse/pipe/stream.rb