lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.2.0 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.0

- old
+ new

@@ -16,10 +16,11 @@ config_param :topic, :string config_param :subscription, :string config_param :pull_interval, :float, default: 5.0 config_param :max_messages, :integer, default: 100 config_param :return_immediately, :bool, default: true + config_param :pull_threads, :integer, default: 1 config_param :format, :string, default: 'json' # for HTTP RPC config_param :enable_rpc, :bool, default: false config_param :rpc_bind, :string, default: '0.0.0.0' config_param :rpc_port, :integer, default: 24680 @@ -92,11 +93,14 @@ @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription log.debug "connected subscription:#{@subscription} in project #{@project}" @stop_subscribing = false - @subscribe_thread = Thread.new(&method(:subscribe)) + @subscribe_threads = [] + @pull_threads.times do + @subscribe_threads.push Thread.new(&method(:subscribe)) + end end def shutdown super if @rpc_srv @@ -106,10 +110,10 @@ if @rpc_thread @rpc_thread.join @rpc_thread = nil end @stop_subscribing = true - @subscribe_thread.join + @subscribe_threads.each(&:join) end def stop_pull @stop_pull = true log.info "stop pull from subscription:#{@subscription}"