Sha256: 74f21332f8277ed0d858d057d8e6e648c4323eba73948301ea515a70aa1acd0a

Contents?: true

Size: 1.02 KB

Versions: 1

Compression:

Stored size: 1.02 KB

Contents

require 'fluent/plugin/input'
require 'grpc'
require 'event_services_pb'

module Fluent::Plugin
  class GrpcInput < Input
    Fluent::Plugin.register_input('grpc', self)

    helpers :thread

    desc 'The address to bind to.'
    config_param :bind, :string, default: '0.0.0.0'
    desc 'The port to listen to.'
    config_param :port, :integer, default: 50051

    class ForwarderServer < Event::Forwarder::Service
      def forward(event, _unused_call)
        $stdout.puts event.name
        Event::Ack.new(type: "received")
      end
    end

    public
    def configure(conf)
      super

      if @port < 1024
        raise Fluent::ConfigError, "well known ports cannot be used for this purpose."
      end
    end

    public
    def start
      super

      @s = GRPC::RpcServer.new
      @s.add_http2_port("#{@bind}:#{@port}", :this_port_is_insecure)
      @s.handle(ForwarderServer)
      thread_create :in_grpc_server do
        @s.run
      end
    end

    public
    def shutdown
      @s.stop

      super
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-grpc-0.0.2 lib/fluent/plugin/in_grpc.rb