lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.0 vs lib/fluent/plugin/in_beats.rb in fluent-plugin-beats-0.1.1
- old
+ new
@@ -13,10 +13,13 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+require 'fluent/input'
+
module Fluent
class BeatsInput < Input
Plugin.register_input('beats', self)
def initialize
@@ -67,12 +70,10 @@
@lumberjack.close rescue nil
@thread_pool.shutdown
@thread.join
end
- FILEBEAT_MESSAGE = 'message'
-
def run
until @lumberjack.closed?
conn = @lumberjack.accept
next if conn.nil?
@@ -89,10 +90,10 @@
@thread_pool.post {
begin
conn.run { |map|
tag = @metadata_as_tag ? map['@metadata']['beat'] : @tag
- if map.has_key?('message') && @format
+ if map.has_key?('message') && @format
message = map.delete('message')
@parser.parse(message) { |time, record|
record['@timestamp'] = map['@timestamp']
map.each { |k, v|
record[k] = v