lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.4.6 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.0.0

- old
+ new

@@ -1,24 +1,21 @@ require 'json' require 'webrick' -require 'fluent/input' -require 'fluent/parser' +require 'fluent/plugin/input' +require 'fluent/plugin/parser' require 'fluent/plugin/gcloud_pubsub/client' -module Fluent +module Fluent::Plugin class GcloudPubSubInput < Input Fluent::Plugin.register_input('gcloud_pubsub', self) - class << self - unless method_defined?(:desc) - def desc(description) - end - end - end + helpers :compat_parameters, :parser, :thread + DEFAULT_PARSER_TYPE = 'json' + class FailedParseError < StandardError end desc 'Set tag of messages.' config_param :tag, :string @@ -39,29 +36,25 @@ desc 'Setting `true`, keepalive connection to wait for new messages.' config_param :return_immediately, :bool, default: true desc 'Set number of threads to pull messages.' config_param :pull_threads, :integer, default: 1 desc 'Set input format.' - config_param :format, :string, default: 'json' + config_param :format, :string, default: DEFAULT_PARSER_TYPE desc 'Set error type when parsing messages fails.' config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning] # for HTTP RPC desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.' config_param :enable_rpc, :bool, default: false desc 'Bind IP address for HTTP RPC.' config_param :rpc_bind, :string, default: '0.0.0.0' desc 'Port for HTTP RPC.' config_param :rpc_port, :integer, default: 24680 - unless method_defined?(:log) - define_method("log") { $log } + config_section :parse do + config_set_default :@type, DEFAULT_PARSER_TYPE end - unless method_defined?(:router) - define_method("router") { Fluent::Engine } - end - class RPCServlet < WEBrick::HTTPServlet::AbstractServlet class Error < StandardError; end def initialize(server, plugin) super @@ -106,10 +99,11 @@ render_json(200, ret) end end def configure(conf) + compat_parameters_convert(conf, :parser) super @rpc_srv = nil @rpc_thread = nil @stop_pull = false @@ -117,12 +111,11 @@ method(:static_tag) else method(:dynamic_tag) end - @parser = Plugin.new_parser(@format) - @parser.configure(conf) + @parser = parser_create end def start super start_rpc if @enable_rpc @@ -131,27 +124,26 @@ log.debug "connected subscription:#{@subscription} in project #{@project}" @emit_guard = Mutex.new @stop_subscribing = false @subscribe_threads = [] - @pull_threads.times do - @subscribe_threads.push Thread.new(&method(:subscribe)) + @pull_threads.times do |idx| + @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe)) end end def shutdown - super if @rpc_srv @rpc_srv.shutdown @rpc_srv = nil end if @rpc_thread - @rpc_thread.join @rpc_thread = nil end @stop_subscribing = true @subscribe_threads.each(&:join) + super end def stop_pull @stop_pull = true log.info "stop pull from subscription:#{@subscription}" @@ -185,11 +177,11 @@ Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), AccessLog: [] } ) @rpc_srv.mount('/api/in_gcloud_pubsub/pull/', RPCServlet, self) - @rpc_thread = Thread.new { + @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread){ @rpc_srv.start } end def subscribe @@ -223,10 +215,10 @@ log.error_backtrace ex.backtrace end def process(messages) event_streams = Hash.new do |hsh, key| - hsh[key] = MultiEventStream.new + hsh[key] = Fluent::MultiEventStream.new end messages.each do |m| line = m.message.data.chomp @parser.parse(line) do |time, record|