Sha256: 758e84e2c7e3b43fb04f04aebe071e576e2afbef4a3997a7c440b6212268cb67

Contents?: true

Size: 1.96 KB

Versions: 6

Compression:

Stored size: 1.96 KB

Contents

module Telekinesis
  module Producer
    # A synchronous Kinesis producer.
    #
    # This class is thread safe if and only if the underlying
    # Telekines::Aws::Client is threadsafe. In practice, this means this client
    # is threadsafe on JRuby and not thread safe elsewhere.
    class SyncProducer
      attr_reader :stream, :client

      # Create a new Producer.
      #
      # AWS credentials may be specified by using the `:credentials` option and
      # passing a hash containing your `:access_key_id` and `:secret_access_key`.
      # If unspecified, credentials will be fetched from the environment, an
      # ~/.aws/credentials file, or the current instance metadata.
      #
      # `:send_size` may also be used to configure the maximum batch size used
      # in `put_all`. See `put_all` for more info.
      def self.create(options = {})
        stream = options[:stream]
        client = Telekinesis::Aws::Client.build(options.fetch(:credentials, {}))
        new(stream, client, failure_handler, options)
      end

      def initialize(stream, client, opts = {})
        @stream = stream or raise ArgumentError, "stream may not be nil"
        @client = client or raise ArgumentError, "client may not be nil"
        @send_size = opts.fetch(:send_size, Telekinesis::Aws::KINESIS_MAX_PUT_RECORDS_SIZE)
      end

      # Put an individual k, v pair to Kinesis immediately. Both k and v must
      # be strings.
      #
      # Returns once the call to Kinesis is complete.
      def put(key, data)
        @client.put_record(@stream, key, data)
      end

      # Put all of the [k, v] pairs to Kinesis in as few requests as possible.
      # All of the ks and vs must be strings.
      #
      # Each request sends at most `:send_size` records. By default this is the
      # Kinesis API limit of 500 records.
      def put_all(items)
        items.each_slice(@send_size).flat_map do |batch|
          @client.put_records(@stream, batch)
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
telekinesis-3.2.0-java lib/telekinesis/producer/sync_producer.rb
telekinesis-3.1.1-java lib/telekinesis/producer/sync_producer.rb
telekinesis-3.1.0-java lib/telekinesis/producer/sync_producer.rb
telekinesis-3.0.0-java lib/telekinesis/producer/sync_producer.rb
telekinesis-2.0.1-java lib/telekinesis/producer/sync_producer.rb
telekinesis-2.0.0-java lib/telekinesis/producer/sync_producer.rb