# # Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://www.apache.org/licenses/LICENSE-2.0 # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. require 'fluent/plugin/kinesis' require 'fluent/plugin/kinesis_helper/aggregator' module Fluent module Plugin class KinesisStreamsAggregatedOutput < KinesisOutput Fluent::Plugin.register_output('kinesis_streams_aggregated', self) include KinesisHelper::Aggregator::Mixin RequestType = :streams_aggregated BatchRequestLimitCount = 100_000 BatchRequestLimitSize = 1024 * 1024 include KinesisHelper::API::BatchRequest config_param :stream_name, :string config_param :fixed_partition_key, :string, default: nil def configure(conf) super @partition_key_generator = create_partition_key_generator @batch_request_max_size -= offset @max_record_size -= offset end def format(tag, time, record) format_for_api do [@data_formatter.call(tag, time, record)] end end def write(chunk) stream_name = extract_placeholders(@stream_name, chunk) write_records_batch(chunk, stream_name) do |batch| key = @partition_key_generator.call records = batch.map{|(data)|data} client.put_records( stream_name: stream_name, records: [{ partition_key: key, data: aggregator.aggregate(records, key), }], ) end end def offset @offset ||= AggregateOffset + @partition_key_generator.call.size*2 end private def size_of_values(record) super(record) + RecordOffset end def create_partition_key_generator if @fixed_partition_key.nil? ->() { SecureRandom.hex(16) } else ->() { @fixed_partition_key } end end end end end