Sha256: 1246b8e1a02a07276c3d693b67521bde3496cc5a5a7e9e39ed47b9dad4305ab7

Contents?: true

Size: 1.86 KB

Versions: 4

Compression:

Stored size: 1.86 KB

Contents

module Fluent
  class CloudwatchLogsInput < Input
    Plugin.register_input('cloudwatch_logs', self)

    config_param :aws_key_id, :string, :default => nil
    config_param :aws_sec_key, :string, :default => nil
    config_param :region, :string, :default => nil
    config_param :tag, :string
    config_param :log_group_name, :string
    config_param :log_stream_name, :string
    config_param :state_file, :string
    config_param :fetch_interval, :time, default: 60

    def initialize
      super

      require 'aws-sdk-core'
    end

    def start
      options = {}
      options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
      options[:region] = @region if @region
      @logs = Aws::CloudWatchLogs::Client.new(options)

      @finished = false
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      @finished = true
      @thread.join
    end

    private
    def next_token
      return nil unless File.exist?(@state_file)
      File.read(@state_file).chomp
    end

    def store_next_token(token)
      open(@state_file, 'w') do |f|
        f.write token
      end
    end

    def run
      @next_fetch_time = Time.now

      until @finished
        if Time.now > @next_fetch_time
          @next_fetch_time += @fetch_interval

          events = get_events
          events.each do |event|
            time = (event.timestamp / 1000).floor
            record = JSON.parse(event.message)
            Engine.emit(@tag, time, record)
          end
        end
        sleep 1
      end
    end

    def get_events
      request = {
        log_group_name: @log_group_name,
        log_stream_name: @log_stream_name,
      }
      request[:next_token] = next_token if next_token
      response = @logs.get_log_events(request)
      store_next_token(response.next_forward_token)

      response.events
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
fluent-plugin-cloudwatch-logs-0.0.7 lib/fluent/plugin/in_cloudwatch_logs.rb
fluent-plugin-cloudwatch-logs-0.0.6 lib/fluent/plugin/in_cloudwatch_logs.rb
fluent-plugin-cloudwatch-logs-0.0.5 lib/fluent/plugin/in_cloudwatch_logs.rb
fluent-plugin-cloudwatch-logs-0.0.4 lib/fluent/plugin/in_cloudwatch_logs.rb