Sha256: 00fee17fa420d9f1601dfbf542d7bbaa804f3ec040b8fad895d2793eaf4f9dac

Contents?: true

Size: 1.95 KB

Versions: 7

Compression:

Stored size: 1.95 KB

Contents

module Telekinesis
  module Aws
    # NOTE: wrapping the cause is necessary since JRuby isn't 2.1 compatible (yet)
    class KinesisError < RuntimeError
      attr_reader :cause

      def initialize(cause)
        @cause = cause
      end
    end

    # Base class for other ClientAdapters. Client adapters exist to make
    # switching between platforms easy and painless.
    #
    # The base adapter defines the interface and provides convience methods.
    class ClientAdapter
      # Build a new client given AWS credentials.
      #
      # Credentials must be supplied as a hash that contains symbolized
      # :access_key_id and :secret_access_key keys.
      def self.build(credentials)
        raise NotImplementedError
      end

      def initialize(client)
        @client = client
      end

      # Make a put_record call to the underlying client. Must return an object
      # that responds to `shard_id` and `sequence_number`.
      def put_record(stream, key, value)
        raise NotImplementedError
      end

      # Make a put_records call to the underlying client. If the request
      # succeeds but returns errors for some records, the original [key, value]
      # pair is zipped with the [error_code, error_message] pair and the
      # offending records are returned.
      def put_records(stream, items)
        response = do_put_records(stream, items)
        failures = items.zip(response).reject{|_, r| r.error_code.nil?}

        failures.map do |(k, v), r|
          [k, v, r.error_code, r.error_message]
        end
      end

      protected

      # Put an enumerable of [key, value] pairs to the given stream. Returns an
      # enumerable of response objects the same size as the given list of items.
      #
      # Response objects must respond to `error_code` and `error_message`. Any
      # response with a nil error_code is considered successful.
      def do_put_records(stream, items)
        raise NotImplementedError
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
telekinesis-3.2.1-java lib/telekinesis/aws/client_adapter.rb
telekinesis-3.2.0-java lib/telekinesis/aws/client_adapter.rb
telekinesis-3.1.1-java lib/telekinesis/aws/client_adapter.rb
telekinesis-3.1.0-java lib/telekinesis/aws/client_adapter.rb
telekinesis-3.0.0-java lib/telekinesis/aws/client_adapter.rb
telekinesis-2.0.1-java lib/telekinesis/aws/client_adapter.rb
telekinesis-2.0.0-java lib/telekinesis/aws/client_adapter.rb