Sha256: f8902200399fcc0537d866a12c8d45e519b369c57d1567f6c2e4503572725451
Contents?: true
Size: 1.25 KB
Versions: 2
Compression:
Stored size: 1.25 KB
Contents
module Wukong module Load class SourceDriver < Wukong::Local::StdioDriver include Logging attr_accessor :index, :batch_size def post_init super() self.index = 1 self.batch_size = settings[:batch_size].to_i if settings[:batch_size] && settings[:batch_size].to_i > 0 end def self.start(label, settings={}) driver = new(:foobar, label, settings) driver.post_init period = case when settings[:period] then settings[:period] when settings[:per_sec] then (1.0 / settings[:per_sec]) rescue 1.0 else 1.0 end driver.create_event EventMachine::PeriodicTimer.new(period) { driver.create_event } end def create_event receive_line(index.to_s) self.index += 1 finalize_dataflow if self.batch_size && (self.index % self.batch_size) == 0 end # :nodoc: # # Not sure why I have to add the call to $stdout.flush at the # end of this method. Supposedly $stdout.sync is called during # the #setup method in StdoutProcessor in # wukong/widget/processors. Doesn't that do this? def process record $stdout.puts record $stdout.flush end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
wukong-load-0.1.1 | lib/wukong-load/source_driver.rb |
wukong-load-0.1.0 | lib/wukong-load/source_driver.rb |