Sha256: cfb9621e5dcb0b8cb0b6898a0c8b4183c89cb3772626a3ff33452e9e2f69ec0c
Contents?: true
Size: 1.31 KB
Versions: 2
Compression:
Stored size: 1.31 KB
Contents
# # Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Amazon Software License (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://aws.amazon.com/asl/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. require 'fluent/plugin/kinesis_helper' module Fluent class KinesisStreamsOutput < BufferedOutput include KinesisHelper Fluent::Plugin.register_output('kinesis_streams', self) config_param_for_streams def write(chunk) records = convert_to_records(chunk) split_to_batches(records).each do |batch| next unless batch.size > 0 batch_request_with_retry(batch) end log.debug("Written #{records.size} records") end private def convert_format(tag, time, record) { data: data_format(tag, time, record), partition_key: key(record), } end def batch_request(batch) client.put_records( stream_name: @stream_name, records: batch, ) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-kinesis-1.3.0 | lib/fluent/plugin/out_kinesis_streams.rb |
fluent-plugin-kinesis-1.2.0 | lib/fluent/plugin/out_kinesis_streams.rb |