lib/fluent/engine.rb in fluentd-0.10.36 vs lib/fluent/engine.rb in fluentd-0.10.37
- old
+ new
@@ -14,320 +14,323 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
+ class EngineClass
+ def initialize
+ @matches = []
+ @sources = []
+ @match_cache = {}
+ @match_cache_keys = []
+ @started = []
+ @default_loop = nil
+ @log_emit_thread = nil
+ @log_event_loop_stop = false
+ @log_event_queue = []
-class EngineClass
- def initialize
- @matches = []
- @sources = []
- @match_cache = {}
- @match_cache_keys = []
- @started = []
- @default_loop = nil
+ @suppress_emit_error_log_interval = 0
+ @next_emit_error_log_time = nil
+ end
- @log_emit_thread = nil
- @log_event_loop_stop = false
- @log_event_queue = []
+ MATCH_CACHE_SIZE = 1024
- @suppress_emit_error_log_interval = 0
- @next_emit_error_log_time = nil
- end
+ LOG_EMIT_INTERVAL = 0.1
- MATCH_CACHE_SIZE = 1024
+ attr_reader :matches, :sources
- LOG_EMIT_INTERVAL = 0.1
+ def init
+ BasicSocket.do_not_reverse_lookup = true
+ Plugin.load_plugins
+ if defined?(Encoding)
+ Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
+ Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
+ end
+ self
+ end
- attr_reader :matches, :sources
+ def suppress_interval(interval_time)
+ @suppress_emit_error_log_interval = interval_time
+ @next_emit_error_log_time = Time.now.to_i
+ end
- def init
- BasicSocket.do_not_reverse_lookup = true
- Plugin.load_plugins
- if defined?(Encoding)
- Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
- Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
+ def read_config(path)
+ $log.info "reading config file", :path=>path
+ File.open(path) {|io|
+ parse_config(io, File.basename(path), File.dirname(path))
+ }
end
- self
- end
- def suppress_interval(interval_time)
- @suppress_emit_error_log_interval = interval_time
- @next_emit_error_log_time = Time.now.to_i
- end
+ def parse_config(io, fname, basepath=Dir.pwd)
+ conf = if fname =~ /\.rb$/
+ Config::DSL::DSLParser.parse(io, fname)
+ else
+ Config.parse(io, fname, basepath)
+ end
+ configure(conf)
+ conf.check_not_fetched {|key,e|
+ $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
+ }
+ end
- def read_config(path)
- $log.info "reading config file", :path=>path
- File.open(path) {|io|
- parse_config(io, File.basename(path), File.dirname(path))
- }
- end
+ def configure(conf)
+ $log.info "using configuration file: #{conf.to_s.rstrip}"
- def parse_config(io, fname, basepath=Dir.pwd)
- conf = Config.parse(io, fname, basepath)
- configure(conf)
- conf.check_not_fetched {|key,e|
- $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
- }
- end
+ conf.elements.select {|e|
+ e.name == 'source'
+ }.each {|e|
+ type = e['type']
+ unless type
+ raise ConfigError, "Missing 'type' parameter on <source> directive"
+ end
+ $log.info "adding source type=#{type.dump}"
- def configure(conf)
- $log.info "using configuration file: #{conf.to_s.rstrip}"
+ input = Plugin.new_input(type)
+ input.configure(e)
- conf.elements.select {|e|
- e.name == 'source'
- }.each {|e|
- type = e['type']
- unless type
- raise ConfigError, "Missing 'type' parameter on <source> directive"
- end
- $log.info "adding source type=#{type.dump}"
+ @sources << input
+ }
- input = Plugin.new_input(type)
- input.configure(e)
+ conf.elements.select {|e|
+ e.name == 'match'
+ }.each {|e|
+ type = e['type']
+ pattern = e.arg
+ unless type
+ raise ConfigError, "Missing 'type' parameter on <match #{e.arg}> directive"
+ end
+ $log.info "adding match", :pattern=>pattern, :type=>type
- @sources << input
- }
+ output = Plugin.new_output(type)
+ output.configure(e)
- conf.elements.select {|e|
- e.name == 'match'
- }.each {|e|
- type = e['type']
- pattern = e.arg
- unless type
- raise ConfigError, "Missing 'type' parameter on <match #{e.arg}> directive"
- end
- $log.info "adding match", :pattern=>pattern, :type=>type
+ match = Match.new(pattern, output)
+ @matches << match
+ }
+ end
- output = Plugin.new_output(type)
- output.configure(e)
+ def load_plugin_dir(dir)
+ Plugin.load_plugin_dir(dir)
+ end
- match = Match.new(pattern, output)
- @matches << match
- }
- end
+ def emit(tag, time, record)
+ unless record.nil?
+ emit_stream tag, OneEventStream.new(time, record)
+ end
+ end
- def load_plugin_dir(dir)
- Plugin.load_plugin_dir(dir)
- end
+ def emit_array(tag, array)
+ emit_stream tag, ArrayEventStream.new(array)
+ end
- def emit(tag, time, record)
- emit_stream tag, OneEventStream.new(time, record)
- end
-
- def emit_array(tag, array)
- emit_stream tag, ArrayEventStream.new(array)
- end
-
- def emit_stream(tag, es)
- target = @match_cache[tag]
- unless target
- target = match(tag) || NoMatchMatch.new
- # this is not thread-safe but inconsistency doesn't
- # cause serious problems while locking causes.
- if @match_cache_keys.size >= MATCH_CACHE_SIZE
- @match_cache_keys.delete @match_cache_keys.shift
+ def emit_stream(tag, es)
+ target = @match_cache[tag]
+ unless target
+ target = match(tag) || NoMatchMatch.new
+ # this is not thread-safe but inconsistency doesn't
+ # cause serious problems while locking causes.
+ if @match_cache_keys.size >= MATCH_CACHE_SIZE
+ @match_cache_keys.delete @match_cache_keys.shift
+ end
+ @match_cache[tag] = target
+ @match_cache_keys << tag
end
- @match_cache[tag] = target
- @match_cache_keys << tag
+ target.emit(tag, es)
+ rescue => e
+ if @suppress_emit_error_log_interval == 0 || now > @next_emit_error_log_time
+ $log.warn "emit transaction failed ", :error_class=>e.class, :error=>e
+ $log.warn_backtrace
+ # $log.debug "current next_emit_error_log_time: #{Time.at(@next_emit_error_log_time)}"
+ @next_emit_error_log_time = Time.now.to_i + @suppress_emit_error_log_interval
+ # $log.debug "next emit failure log suppressed"
+ # $log.debug "next logged time is #{Time.at(@next_emit_error_log_time)}"
+ end
+ raise
end
- target.emit(tag, es)
- rescue => e
- if @suppress_emit_error_log_interval == 0 || now > @next_emit_error_log_time
- $log.warn "emit transaction failed ", :error_class=>e.class, :error=>e
- $log.warn_backtrace
- # $log.debug "current next_emit_error_log_time: #{Time.at(@next_emit_error_log_time)}"
- @next_emit_error_log_time = Time.now.to_i + @suppress_emit_error_log_interval
- # $log.debug "next emit failure log suppressed"
- # $log.debug "next logged time is #{Time.at(@next_emit_error_log_time)}"
+
+ def match(tag)
+ @matches.find {|m| m.match(tag) }
end
- raise
- end
- def match(tag)
- @matches.find {|m| m.match(tag) }
- end
+ def match?(tag)
+ !!match(tag)
+ end
- def match?(tag)
- !!match(tag)
- end
+ def flush!
+ flush_recursive(@matches)
+ end
- def flush!
- flush_recursive(@matches)
- end
+ def now
+ # TODO thread update
+ Time.now.to_i
+ end
- def now
- # TODO thread update
- Time.now.to_i
- end
+ def log_event_loop
+ $log.disable_events(Thread.current)
- def log_event_loop
- $log.disable_events(Thread.current)
+ while sleep(LOG_EMIT_INTERVAL)
+ break if @log_event_loop_stop
+ next if @log_event_queue.empty?
- while sleep(LOG_EMIT_INTERVAL)
- break if @log_event_loop_stop
- next if @log_event_queue.empty?
+ # NOTE: thead-safe of slice! depends on GVL
+ events = @log_event_queue.slice!(0..-1)
+ next if events.empty?
- # NOTE: thead-safe of slice! depends on GVL
- events = @log_event_queue.slice!(0..-1)
- next if events.empty?
+ events.each {|tag,time,record|
+ begin
+ Engine.emit(tag, time, record)
+ rescue => e
+ $log.error "failed to emit fluentd's log event", :tag => tag, :event => record, :error_class => e.class, :error => e
+ end
+ }
+ end
+ end
- events.each {|tag,time,record|
- begin
- Engine.emit(tag, time, record)
- rescue => e
- $log.error "failed to emit fluentd's log event", :tag => tag, :event => record, :error_class => e.class, :error => e
+ def run
+ begin
+ start
+
+ if match?($log.tag)
+ $log.enable_event
+ @log_emit_thread = Thread.new(&method(:log_event_loop))
end
- }
- end
- end
- def run
- begin
- start
+ # for empty loop
+ @default_loop = Coolio::Loop.default
+ @default_loop.attach Coolio::TimerWatcher.new(1, true)
+ # TODO attach async watch for thread pool
+ @default_loop.run
- if match?($log.tag)
- $log.enable_event
- @log_emit_thread = Thread.new(&method(:log_event_loop))
+ rescue => e
+ $log.error "unexpected error", :error_class=>e.class, :error=>e
+ $log.error_backtrace
+ ensure
+ $log.info "shutting down fluentd"
+ shutdown
+ if @log_emit_thread
+ @log_event_loop_stop = true
+ @log_emit_thread.join
+ end
end
+ end
- # for empty loop
- @default_loop = Coolio::Loop.default
- @default_loop.attach Coolio::TimerWatcher.new(1, true)
- # TODO attach async watch for thread pool
- @default_loop.run
-
- rescue => e
- $log.error "unexpected error", :error_class=>e.class, :error=>e
- $log.error_backtrace
- ensure
- $log.info "shutting down fluentd"
- shutdown
- if @log_emit_thread
- @log_event_loop_stop = true
- @log_emit_thread.join
+ def stop
+ if @default_loop
+ @default_loop.stop
+ @default_loop = nil
end
+ nil
end
- end
- def stop
- if @default_loop
- @default_loop.stop
- @default_loop = nil
+ def push_log_event(tag, time, record)
+ return if @log_emit_thread.nil?
+ @log_event_queue.push([tag, time, record])
end
- nil
- end
- def push_log_event(tag, time, record)
- return if @log_emit_thread.nil?
- @log_event_queue.push([tag, time, record])
- end
+ private
+ def start
+ @matches.each {|m|
+ m.start
+ @started << m
+ }
+ @sources.each {|s|
+ s.start
+ @started << s
+ }
+ end
- private
- def start
- @matches.each {|m|
- m.start
- @started << m
- }
- @sources.each {|s|
- s.start
- @started << s
- }
- end
+ def shutdown
+ @started.map {|s|
+ Thread.new do
+ begin
+ s.shutdown
+ rescue => e
+ $log.warn "unexpected error while shutting down", :error_class=>e.class, :error=>e
+ $log.warn_backtrace
+ end
+ end
+ }.each {|t|
+ t.join
+ }
+ end
- def shutdown
- @started.map {|s|
- Thread.new do
+ def flush_recursive(array)
+ array.each {|m|
begin
- s.shutdown
+ if m.is_a?(Match)
+ m = m.output
+ end
+ if m.is_a?(BufferedOutput)
+ m.force_flush
+ elsif m.is_a?(MultiOutput)
+ flush_recursive(m.outputs)
+ end
rescue => e
- $log.warn "unexpected error while shutting down", :error_class=>e.class, :error=>e
- $log.warn_backtrace
+ $log.debug "error while force flushing", :error_class=>e.class, :error=>e
+ $log.debug_backtrace
end
+ }
+ end
+
+ class NoMatchMatch
+ def initialize
+ @count = 0
end
- }.each {|t|
- t.join
- }
- end
- def flush_recursive(array)
- array.each {|m|
- begin
- if m.is_a?(Match)
- m = m.output
+ def emit(tag, es)
+ # TODO use time instead of num of records
+ c = (@count += 1)
+ if c < 512
+ if Math.log(c) / Math.log(2) % 1.0 == 0
+ $log.warn "no patterns matched", :tag=>tag
+ return
+ end
+ else
+ if c % 512 == 0
+ $log.warn "no patterns matched", :tag=>tag
+ return
+ end
end
- if m.is_a?(BufferedOutput)
- m.force_flush
- elsif m.is_a?(MultiOutput)
- flush_recursive(m.outputs)
- end
- rescue => e
- $log.debug "error while force flushing", :error_class=>e.class, :error=>e
- $log.debug_backtrace
+ $log.on_trace { $log.trace "no patterns matched", :tag=>tag }
end
- }
- end
- class NoMatchMatch
- def initialize
- @count = 0
- end
+ def start
+ end
- def emit(tag, es)
- # TODO use time instead of num of records
- c = (@count += 1)
- if c < 512
- if Math.log(c) / Math.log(2) % 1.0 == 0
- $log.warn "no patterns matched", :tag=>tag
- return
- end
- else
- if c % 512 == 0
- $log.warn "no patterns matched", :tag=>tag
- return
- end
+ def shutdown
end
- $log.on_trace { $log.trace "no patterns matched", :tag=>tag }
- end
- def start
+ def match(tag)
+ false
+ end
end
-
- def shutdown
- end
-
- def match(tag)
- false
- end
end
-end
-Engine = EngineClass.new
+ Engine = EngineClass.new
-module Test
- @@test = false
+ module Test
+ @@test = false
- def test?
- @@test
- end
+ def test?
+ @@test
+ end
- def self.setup
- @@test = true
+ def self.setup
+ @@test = true
- Fluent.__send__(:remove_const, :Engine)
- engine = Fluent.const_set(:Engine, EngineClass.new).init
+ Fluent.__send__(:remove_const, :Engine)
+ engine = Fluent.const_set(:Engine, EngineClass.new).init
- engine.define_singleton_method(:now=) {|n|
- @now = n.to_i
- }
- engine.define_singleton_method(:now) {
- @now || super()
- }
+ engine.define_singleton_method(:now=) {|n|
+ @now = n.to_i
+ }
+ engine.define_singleton_method(:now) {
+ @now || super()
+ }
- nil
+ nil
+ end
end
-end
-
end