lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.4.6 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.0.0
- old
+ new
@@ -1,24 +1,21 @@
require 'json'
require 'webrick'
-require 'fluent/input'
-require 'fluent/parser'
+require 'fluent/plugin/input'
+require 'fluent/plugin/parser'
require 'fluent/plugin/gcloud_pubsub/client'
-module Fluent
+module Fluent::Plugin
class GcloudPubSubInput < Input
Fluent::Plugin.register_input('gcloud_pubsub', self)
- class << self
- unless method_defined?(:desc)
- def desc(description)
- end
- end
- end
+ helpers :compat_parameters, :parser, :thread
+ DEFAULT_PARSER_TYPE = 'json'
+
class FailedParseError < StandardError
end
desc 'Set tag of messages.'
config_param :tag, :string
@@ -39,29 +36,25 @@
desc 'Setting `true`, keepalive connection to wait for new messages.'
config_param :return_immediately, :bool, default: true
desc 'Set number of threads to pull messages.'
config_param :pull_threads, :integer, default: 1
desc 'Set input format.'
- config_param :format, :string, default: 'json'
+ config_param :format, :string, default: DEFAULT_PARSER_TYPE
desc 'Set error type when parsing messages fails.'
config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning]
# for HTTP RPC
desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.'
config_param :enable_rpc, :bool, default: false
desc 'Bind IP address for HTTP RPC.'
config_param :rpc_bind, :string, default: '0.0.0.0'
desc 'Port for HTTP RPC.'
config_param :rpc_port, :integer, default: 24680
- unless method_defined?(:log)
- define_method("log") { $log }
+ config_section :parse do
+ config_set_default :@type, DEFAULT_PARSER_TYPE
end
- unless method_defined?(:router)
- define_method("router") { Fluent::Engine }
- end
-
class RPCServlet < WEBrick::HTTPServlet::AbstractServlet
class Error < StandardError; end
def initialize(server, plugin)
super
@@ -106,10 +99,11 @@
render_json(200, ret)
end
end
def configure(conf)
+ compat_parameters_convert(conf, :parser)
super
@rpc_srv = nil
@rpc_thread = nil
@stop_pull = false
@@ -117,12 +111,11 @@
method(:static_tag)
else
method(:dynamic_tag)
end
- @parser = Plugin.new_parser(@format)
- @parser.configure(conf)
+ @parser = parser_create
end
def start
super
start_rpc if @enable_rpc
@@ -131,27 +124,26 @@
log.debug "connected subscription:#{@subscription} in project #{@project}"
@emit_guard = Mutex.new
@stop_subscribing = false
@subscribe_threads = []
- @pull_threads.times do
- @subscribe_threads.push Thread.new(&method(:subscribe))
+ @pull_threads.times do |idx|
+ @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
end
end
def shutdown
- super
if @rpc_srv
@rpc_srv.shutdown
@rpc_srv = nil
end
if @rpc_thread
- @rpc_thread.join
@rpc_thread = nil
end
@stop_subscribing = true
@subscribe_threads.each(&:join)
+ super
end
def stop_pull
@stop_pull = true
log.info "stop pull from subscription:#{@subscription}"
@@ -185,11 +177,11 @@
Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
AccessLog: []
}
)
@rpc_srv.mount('/api/in_gcloud_pubsub/pull/', RPCServlet, self)
- @rpc_thread = Thread.new {
+ @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread){
@rpc_srv.start
}
end
def subscribe
@@ -223,10 +215,10 @@
log.error_backtrace ex.backtrace
end
def process(messages)
event_streams = Hash.new do |hsh, key|
- hsh[key] = MultiEventStream.new
+ hsh[key] = Fluent::MultiEventStream.new
end
messages.each do |m|
line = m.message.data.chomp
@parser.parse(line) do |time, record|