lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.2.0 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.0
- old
+ new
@@ -16,10 +16,11 @@
config_param :topic, :string
config_param :subscription, :string
config_param :pull_interval, :float, default: 5.0
config_param :max_messages, :integer, default: 100
config_param :return_immediately, :bool, default: true
+ config_param :pull_threads, :integer, default: 1
config_param :format, :string, default: 'json'
# for HTTP RPC
config_param :enable_rpc, :bool, default: false
config_param :rpc_bind, :string, default: '0.0.0.0'
config_param :rpc_port, :integer, default: 24680
@@ -92,11 +93,14 @@
@subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
log.debug "connected subscription:#{@subscription} in project #{@project}"
@stop_subscribing = false
- @subscribe_thread = Thread.new(&method(:subscribe))
+ @subscribe_threads = []
+ @pull_threads.times do
+ @subscribe_threads.push Thread.new(&method(:subscribe))
+ end
end
def shutdown
super
if @rpc_srv
@@ -106,10 +110,10 @@
if @rpc_thread
@rpc_thread.join
@rpc_thread = nil
end
@stop_subscribing = true
- @subscribe_thread.join
+ @subscribe_threads.each(&:join)
end
def stop_pull
@stop_pull = true
log.info "stop pull from subscription:#{@subscription}"