lib/logstash-logger/device/kinesis.rb in logstash-logger-0.23.0 vs lib/logstash-logger/device/kinesis.rb in logstash-logger-0.24.0

- old
+ new

@@ -1,80 +1,39 @@ require 'aws-sdk' +require 'logstash-logger/device/aws_stream' module LogStashLogger module Device - class Kinesis < Connectable - - DEFAULT_REGION = 'us-east-1' - DEFAULT_STREAM = 'logstash' - RECOVERABLE_ERROR_CODES = [ + class Kinesis < AwsStream + @stream_class = ::Aws::Kinesis::Client + @recoverable_error_codes = [ "ServiceUnavailable", "Throttling", "RequestExpired", "ProvisionedThroughputExceededException" - ] + ].freeze - attr_accessor :aws_region, :stream - - def initialize(opts) - super - @access_key_id = opts[:aws_access_key_id] || ENV['AWS_ACCESS_KEY_ID'] - @secret_access_key = opts[:aws_secret_access_key] || ENV['AWS_SECRET_ACCESS_KEY'] - @aws_region = opts[:aws_region] || DEFAULT_REGION - @stream = opts[:stream] || DEFAULT_STREAM + def transform_message(message) + { + data: message, + partition_key: SecureRandom.uuid + } end - def connect - @io = ::Aws::Kinesis::Client.new( - region: @aws_region, - credentials: ::Aws::Credentials.new(@access_key_id, @secret_access_key) - ) + def put_records(records) + @io.put_records({ + records: records, + stream_name: @stream + }) end - def with_connection - connect unless connected? - yield - rescue => e - log_error(e) - log_warning("giving up") - close(flush: false) + def is_successful_response(resp) + resp.failed_record_count == 0 end - def write_batch(messages, group = nil) - kinesis_records = messages.map do |message| - { - data: message, - partition_key: SecureRandom.uuid - } - end - - with_connection do - resp = @io.put_records({ - records: kinesis_records, - stream_name: @stream - }) - - # Put any failed records back into the buffer - if resp.failed_record_count != 0 - resp.records.each_with_index do |record, index| - if RECOVERABLE_ERROR_CODES.include?(record.error_code) - log_warning("Failed to post record to kinesis with error: #{record.error_code} #{record.error_message}") - log_warning("Retrying") - write(kinesis_records[index][:data]) - elsif !record.error_code.nil? && record.error_code != '' - log_error("Failed to post record to kinesis with error: #{record.error_code} #{record.error_message}") - end - end - end - end + def get_response_records(resp) + resp.records end - def write_one(message) - write_batch([message]) - end - - def close! - @io = nil - end end end end