lib/wukong/processor.rb in wukong-3.0.0.pre vs lib/wukong/processor.rb in wukong-3.0.0.pre2

- old
+ new

@@ -1,142 +1,88 @@ -require 'vayacondios-client' +require 'log4r' -Settings.define :monitor_interval, :default => 50_000, :type => Integer - module Wukong - class ProcessorError < StandardError ; end + class ProcessorBuilder < Hanuman::StageBuilder + def namespace(*args) + args.first.is_a?(Class) ? args.first : Wukong::Processor + end + end - class Processor < Hanuman::Action - include Hanuman::IsOwnInputSlot - include Hanuman::IsOwnOutputSlot + # The Processor is the basic unit of computation in Wukong. A + # processor can be thought of as an arbitrary function that takes + # certain inputs and produces certain (or no) outputs. + # + # A Processor can be written and tested purely in Ruby and on your + # local machine. You can glue processors together + class Processor < Hanuman::Stage + + field :action, Whatever + field :log, Whatever, :default => -> { log = Log4r::Logger.new(self.class.to_s) ; log.outputters = Log4r::StdoutOutputter.new('stdout', formatter: Log4r::PatternFormatter.new(pattern: "%d [%l] %c: %m")) ; log } + field :notifier, Vayacondios::NotifierFactory, :default => Vayacondios.default_notifier - field :name, Symbol, :default => ->{ self.class.handle } - field :count, Integer, doc: 'Number of records seen this run', default: 0 - - # override this in your subclass - def process(record) + def self.describe desc + @description = desc end - # passes a record on down the line - def emit(record) - self.count += 1 - if (count % Settings.monitor_interval.to_i == 0) - log.info "emit\t%-23s\t%-47s\t%s" % [self.class, self.inspect, record.inspect] - end - output.process(record) - rescue Wukong::ProcessorError - raise - rescue StandardError => err - next_block = output.name rescue "(bad stage)" - log.warn "#{self}: error emitting #{next_block}: #{err.message}" - raise Wukong::ProcessorError, err.message, err.backtrace + def self.description + @description end - def bad_record(*args) - BadRecord.make(*args) + def self.consumes label end - def self.register_processor(name=nil, &block) - register_action(name, &block) + def self.produces label end + + # This is a placeholder method intended to be overridden + def perform_action(*args) ; end - include Vayacondios::Notifications - - class_attribute :log - self.log = Log - - config :error_handler, Vayacondios::NotifierFactory, :default => ->{ Vayacondios::NotifierFactory.receive(type: 'log', log: self.log) } - - def bad_record(record, options = {}) - error_handler.notify(record, options.merge(level: 'error')) + # The action attribute is turned into the perform action method + def receive_action(action) + self.define_singleton_method(:perform_action, &action) end - end - class AsIs < Processor - # accepts records, emits as-is - def process(*args) - emit(*args) + # Valid notifier types are currently :http or :log + # This processor's log is passed to vayacondios + def receive_notifier(type) + if type.is_a?(Hash) + @notifier = Vayacondios::NotifierFactory.receive({type: 'log'}.merge(type)) + else + @notifier = Vayacondios::NotifierFactory.receive(type: type, log: log) + end end - register_processor - end - class Null < Processor - self.register_processor - - # accepts records, emits none - def process(*) - # ze goggles... zey do nussing! + # Send information to Vayacondios; data goes in, the right thing happens + def notify(topic, cargo) + notifier.notify(topic, cargo) end - end - # - # Foreach calls a block on every record, and depends on the block to call - # emit. You can emit one record, many records, or no records, and with any - # contents. If you'll always emit exactly one record out per record in, - # you may prefer Wukong::Widget::Map. - # - # @example regenerate a wordbag with counts matching the original - # foreach{|rec| rec.count.times{ emit(rec.word) } } - # - # @see Project - # @see Map - class Foreach < Processor - self.register_processor - - # @param [Proc] proc used for body of process method - # @yield ... or supply it as a &block arg. - def initialize(prc=nil, &block) - prc ||= block or raise "Please supply a proc or a block to #{self.class}.new" - define_singleton_method(:process, prc) + # This method is called after the processor class has been instantiated + # but before any records are given to it to process + def setup end - def self.make(workflow, *args, &block) - obj = new(*args, &block) - workflow.add_stage obj - obj + # This method is called once per record + # Override this in your subclass + def process(record, &emit) + yield record end - end - # - # Evaluates the block and emits the result if non-nil - # - # @example turn a record into a tuple - # map{|rec| rec.attributes.values } - # - # @example pass along first matching term, drop on the floor otherwise - # map{|str| str[/\b(love|hate|happy|sad)\b/] } - # - class Map < Processor - self.register_processor - attr_reader :blk - - # @param [Proc] proc to delegate for call - # @yield if proc is omitted, block must be supplied - def initialize(blk=nil, &block) - @blk = blk || block or raise "Please supply a proc or a block to #{self.class}.new" + # This method is called to signal the last record has been + # received but that further processing may still be done, events + # still be yielded, &c. + # + # This can be used within an aggregating processor (like a reducer + # in a map/reduce job) to start processing the final aggregate of + # records since the "last record" has already been received. + def finalize end - def process(*args) - result = blk.call(*args) - emit result unless result.nil? + # This method is called after all records have been passed. It + # signals that processing should stop. + + # This method is called after all records have been processed + def stop end - def self.make(workflow, *args, &block) - obj = new(*args, &block) - workflow.add_stage obj - obj - end - end - - # - # Flatten emits each item in an enumerable as its own record - # - # @example turn a document into all its words - # input > map{|line| line.split(/\W+/) } > flatten > output - class Flatten < Processor - self.register_processor - - def process(iter) - iter.each{|*args| emit(*args) } - end end end