lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.1.4 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.2.0

- old
+ new

@@ -1,58 +1,150 @@ +require 'json' +require 'webrick' + require 'fluent/input' require 'fluent/parser' require 'fluent/plugin/gcloud_pubsub/client' module Fluent class GcloudPubSubInput < Input Fluent::Plugin.register_input('gcloud_pubsub', self) config_param :tag, :string - config_param :project, :string, :default => nil - config_param :key, :string, :default => nil + config_param :project, :string, default: nil + config_param :key, :string, default: nil config_param :topic, :string config_param :subscription, :string - config_param :pull_interval, :float, :default => 5.0 - config_param :max_messages, :integer, :default => 100 - config_param :return_immediately, :bool, :default => true - config_param :format, :string, :default => 'json' + config_param :pull_interval, :float, default: 5.0 + config_param :max_messages, :integer, default: 100 + config_param :return_immediately, :bool, default: true + config_param :format, :string, default: 'json' + # for HTTP RPC + config_param :enable_rpc, :bool, default: false + config_param :rpc_bind, :string, default: '0.0.0.0' + config_param :rpc_port, :integer, default: 24680 unless method_defined?(:log) define_method("log") { $log } end unless method_defined?(:router) define_method("router") { Fluent::Engine } end + class RPCServlet < WEBrick::HTTPServlet::AbstractServlet + class Error < StandardError; end + + def initialize(server, plugin) + super + @plugin = plugin + end + + def do_GET(req, res) + begin + code, header, body = process(req, res) + rescue + code, header, body = render_json(500, { + 'ok' => false, + 'message' => 'Internal Server Error', + 'error' => "#{$!}", + 'backtrace'=> $!.backtrace + }) + end + + res.status = code + header.each_pair {|k,v| + res[k] = v + } + res.body = body + end + + def render_json(code, obj) + [code, {'Content-Type' => 'application/json'}, obj.to_json] + end + + def process(req, res) + case req.path_info + when '/stop' + @plugin.stop_pull + when '/start' + @plugin.start_pull + else + raise Error.new "Invalid path_info: #{req.path_info}" + end + render_json(200, {'ok' => true}) + end + end + def configure(conf) super + @rpc_srv = nil + @rpc_thread = nil + @stop_pull = false + @parser = Plugin.new_parser(@format) @parser.configure(conf) end def start super + start_rpc if @enable_rpc + @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription log.debug "connected subscription:#{@subscription} in project #{@project}" + @stop_subscribing = false @subscribe_thread = Thread.new(&method(:subscribe)) end def shutdown super + if @rpc_srv + @rpc_srv.shutdown + @rpc_srv = nil + end + if @rpc_thread + @rpc_thread.join + @rpc_thread = nil + end @stop_subscribing = true @subscribe_thread.join end + def stop_pull + @stop_pull = true + log.info "stop pull from subscription:#{@subscription}" + end + + def start_pull + @stop_pull = false + log.info "start pull from subscription:#{@subscription}" + end + private + def start_rpc + log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/" + @rpc_srv = WEBrick::HTTPServer.new( + { + BindAddress: @rpc_bind, + Port: @rpc_port, + Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), + AccessLog: [] + } + ) + @rpc_srv.mount('/api/in_gcloud_pubsub/pull/', RPCServlet, self) + @rpc_thread = Thread.new { + @rpc_srv.start + } + end + def subscribe until @stop_subscribing - _subscribe + _subscribe unless @stop_pull - if @return_immediately + if @return_immediately || @stop_pull sleep @pull_interval end end rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s