lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.0 vs lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.1

- old
+ new

@@ -13,10 +13,13 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # + +require 'fluent/input' + module Fluent class BeatsInput < Input Plugin.register_input('beats', self) def initialize @@ -67,12 +70,10 @@ @lumberjack.close rescue nil @thread_pool.shutdown @thread.join end - FILEBEAT_MESSAGE = 'message' - def run until @lumberjack.closed? conn = @lumberjack.accept next if conn.nil? @@ -89,10 +90,10 @@ @thread_pool.post { begin conn.run { |map| tag = @metadata_as_tag ? map['@metadata']['beat'] : @tag - if map.has_key?('message') && @format + if map.has_key?('message') && @format message = map.delete('message') @parser.parse(message) { |time, record| record['@timestamp'] = map['@timestamp'] map.each { |k, v| record[k] = v