Sha256: 00ade4d263d380d39898138343477f8edea35bad6464d564d2311bd6c4b585ac

Contents?: true

Size: 1.47 KB

Versions: 3

Compression:

Stored size: 1.47 KB

Contents

#
#  Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#  Licensed under the Amazon Software License (the "License").
#  You may not use this file except in compliance with the License.
#  A copy of the License is located at
#
#  http://aws.amazon.com/asl/
#
#  or in the "license" file accompanying this file. This file is distributed
#  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#  express or implied. See the License for the specific language governing
#  permissions and limitations under the License.

require 'fluent/plugin/kinesis_helper'

module Fluent
  class KinesisProducerOutput < BufferedOutput
    include KinesisHelper
    Fluent::Plugin.register_output('kinesis_producer', self)
    config_param_for_producer

    def configure(conf)
      super
      unless @stream_name or @stream_name_prefix
        raise Fluent::ConfigError, "'stream_name' or 'stream_name_prefix' is required"
      end
      if @stream_name and @stream_name_prefix
        raise Fluent::ConfigError, "Only one of 'stream_name' or 'stream_name_prefix' is allowed"
      end
    end

    def write(chunk)
      records = convert_to_records(chunk)
      wait_futures(write_chunk_to_kpl(records))
    end

    private

    def convert_format(tag, time, record)
      {
        data: data_format(tag, time, record),
        partition_key: key(record),
        stream_name: @stream_name ? @stream_name : @stream_name_prefix + tag,
      }
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fluent-plugin-kinesis-1.1.2 lib/fluent/plugin/out_kinesis_producer.rb
fluent-plugin-kinesis-1.1.1 lib/fluent/plugin/out_kinesis_producer.rb
fluent-plugin-kinesis-1.1.0 lib/fluent/plugin/out_kinesis_producer.rb