Sha256: cfb9621e5dcb0b8cb0b6898a0c8b4183c89cb3772626a3ff33452e9e2f69ec0c

Contents?: true

Size: 1.31 KB

Versions: 2

Compression:

Stored size: 1.31 KB

Contents

#
#  Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#  Licensed under the Amazon Software License (the "License").
#  You may not use this file except in compliance with the License.
#  A copy of the License is located at
#
#  http://aws.amazon.com/asl/
#
#  or in the "license" file accompanying this file. This file is distributed
#  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#  express or implied. See the License for the specific language governing
#  permissions and limitations under the License.

require 'fluent/plugin/kinesis_helper'

module Fluent
  class KinesisStreamsOutput < BufferedOutput
    include KinesisHelper
    Fluent::Plugin.register_output('kinesis_streams', self)
    config_param_for_streams

    def write(chunk)
      records = convert_to_records(chunk)
      split_to_batches(records).each do |batch|
        next unless batch.size > 0
        batch_request_with_retry(batch)
      end
      log.debug("Written #{records.size} records")
    end

    private

    def convert_format(tag, time, record)
      {
        data: data_format(tag, time, record),
        partition_key: key(record),
      }
    end

    def batch_request(batch)
      client.put_records(
        stream_name: @stream_name,
        records: batch,
      )
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluent-plugin-kinesis-1.3.0 lib/fluent/plugin/out_kinesis_streams.rb
fluent-plugin-kinesis-1.2.0 lib/fluent/plugin/out_kinesis_streams.rb