lib/logstash-logger/device/kinesis.rb in logstash-logger-0.23.0 vs lib/logstash-logger/device/kinesis.rb in logstash-logger-0.24.0
- old
+ new
@@ -1,80 +1,39 @@
require 'aws-sdk'
+require 'logstash-logger/device/aws_stream'
module LogStashLogger
module Device
- class Kinesis < Connectable
-
- DEFAULT_REGION = 'us-east-1'
- DEFAULT_STREAM = 'logstash'
- RECOVERABLE_ERROR_CODES = [
+ class Kinesis < AwsStream
+ @stream_class = ::Aws::Kinesis::Client
+ @recoverable_error_codes = [
"ServiceUnavailable",
"Throttling",
"RequestExpired",
"ProvisionedThroughputExceededException"
- ]
+ ].freeze
- attr_accessor :aws_region, :stream
-
- def initialize(opts)
- super
- @access_key_id = opts[:aws_access_key_id] || ENV['AWS_ACCESS_KEY_ID']
- @secret_access_key = opts[:aws_secret_access_key] || ENV['AWS_SECRET_ACCESS_KEY']
- @aws_region = opts[:aws_region] || DEFAULT_REGION
- @stream = opts[:stream] || DEFAULT_STREAM
+ def transform_message(message)
+ {
+ data: message,
+ partition_key: SecureRandom.uuid
+ }
end
- def connect
- @io = ::Aws::Kinesis::Client.new(
- region: @aws_region,
- credentials: ::Aws::Credentials.new(@access_key_id, @secret_access_key)
- )
+ def put_records(records)
+ @io.put_records({
+ records: records,
+ stream_name: @stream
+ })
end
- def with_connection
- connect unless connected?
- yield
- rescue => e
- log_error(e)
- log_warning("giving up")
- close(flush: false)
+ def is_successful_response(resp)
+ resp.failed_record_count == 0
end
- def write_batch(messages, group = nil)
- kinesis_records = messages.map do |message|
- {
- data: message,
- partition_key: SecureRandom.uuid
- }
- end
-
- with_connection do
- resp = @io.put_records({
- records: kinesis_records,
- stream_name: @stream
- })
-
- # Put any failed records back into the buffer
- if resp.failed_record_count != 0
- resp.records.each_with_index do |record, index|
- if RECOVERABLE_ERROR_CODES.include?(record.error_code)
- log_warning("Failed to post record to kinesis with error: #{record.error_code} #{record.error_message}")
- log_warning("Retrying")
- write(kinesis_records[index][:data])
- elsif !record.error_code.nil? && record.error_code != ''
- log_error("Failed to post record to kinesis with error: #{record.error_code} #{record.error_message}")
- end
- end
- end
- end
+ def get_response_records(resp)
+ resp.records
end
- def write_one(message)
- write_batch([message])
- end
-
- def close!
- @io = nil
- end
end
end
end