lib/wukong/processor.rb in wukong-3.0.0.pre vs lib/wukong/processor.rb in wukong-3.0.0.pre2
- old
+ new
@@ -1,142 +1,88 @@
-require 'vayacondios-client'
+require 'log4r'
-Settings.define :monitor_interval, :default => 50_000, :type => Integer
module Wukong
- class ProcessorError < StandardError ; end
+ class ProcessorBuilder < Hanuman::StageBuilder
+ def namespace(*args)
+ args.first.is_a?(Class) ? args.first : Wukong::Processor
+ end
+ end
- class Processor < Hanuman::Action
- include Hanuman::IsOwnInputSlot
- include Hanuman::IsOwnOutputSlot
+ # The Processor is the basic unit of computation in Wukong. A
+ # processor can be thought of as an arbitrary function that takes
+ # certain inputs and produces certain (or no) outputs.
+ #
+ # A Processor can be written and tested purely in Ruby and on your
+ # local machine. You can glue processors together
+ class Processor < Hanuman::Stage
+ field :action, Whatever
+ field :log, Whatever, :default => -> { log = ; log.outputters ='stdout', formatter: "%d [%l] %c: %m")) ; log }
+ field :notifier, Vayacondios::NotifierFactory, :default => Vayacondios.default_notifier
- field :name, Symbol, :default => ->{ self.class.handle }
- field :count, Integer, doc: 'Number of records seen this run', default: 0
- # override this in your subclass
- def process(record)
+ def self.describe desc
+ @description = desc
- # passes a record on down the line
- def emit(record)
- self.count += 1
- if (count % Settings.monitor_interval.to_i == 0)
- "emit\t%-23s\t%-47s\t%s" % [self.class, self.inspect, record.inspect]
- end
- output.process(record)
- rescue Wukong::ProcessorError
- raise
- rescue StandardError => err
- next_block = rescue "(bad stage)"
- log.warn "#{self}: error emitting #{next_block}: #{err.message}"
- raise Wukong::ProcessorError, err.message, err.backtrace
+ def self.description
+ @description
- def bad_record(*args)
- BadRecord.make(*args)
+ def self.consumes label
- def self.register_processor(name=nil, &block)
- register_action(name, &block)
+ def self.produces label
+ # This is a placeholder method intended to be overridden
+ def perform_action(*args) ; end
- include Vayacondios::Notifications
- class_attribute :log
- self.log = Log
- config :error_handler, Vayacondios::NotifierFactory, :default => ->{ Vayacondios::NotifierFactory.receive(type: 'log', log: self.log) }
- def bad_record(record, options = {})
- error_handler.notify(record, options.merge(level: 'error'))
+ # The action attribute is turned into the perform action method
+ def receive_action(action)
+ self.define_singleton_method(:perform_action, &action)
- end
- class AsIs < Processor
- # accepts records, emits as-is
- def process(*args)
- emit(*args)
+ # Valid notifier types are currently :http or :log
+ # This processor's log is passed to vayacondios
+ def receive_notifier(type)
+ if type.is_a?(Hash)
+ @notifier = Vayacondios::NotifierFactory.receive({type: 'log'}.merge(type))
+ else
+ @notifier = Vayacondios::NotifierFactory.receive(type: type, log: log)
+ end
- register_processor
- end
- class Null < Processor
- self.register_processor
- # accepts records, emits none
- def process(*)
- # ze goggles... zey do nussing!
+ # Send information to Vayacondios; data goes in, the right thing happens
+ def notify(topic, cargo)
+ notifier.notify(topic, cargo)
- end
- #
- # Foreach calls a block on every record, and depends on the block to call
- # emit. You can emit one record, many records, or no records, and with any
- # contents. If you'll always emit exactly one record out per record in,
- # you may prefer Wukong::Widget::Map.
- #
- # @example regenerate a wordbag with counts matching the original
- # foreach{|rec| rec.count.times{ emit(rec.word) } }
- #
- # @see Project
- # @see Map
- class Foreach < Processor
- self.register_processor
- # @param [Proc] proc used for body of process method
- # @yield ... or supply it as a &block arg.
- def initialize(prc=nil, &block)
- prc ||= block or raise "Please supply a proc or a block to #{self.class}.new"
- define_singleton_method(:process, prc)
+ # This method is called after the processor class has been instantiated
+ # but before any records are given to it to process
+ def setup
- def self.make(workflow, *args, &block)
- obj = new(*args, &block)
- workflow.add_stage obj
- obj
+ # This method is called once per record
+ # Override this in your subclass
+ def process(record, &emit)
+ yield record
- end
- #
- # Evaluates the block and emits the result if non-nil
- #
- # @example turn a record into a tuple
- # map{|rec| rec.attributes.values }
- #
- # @example pass along first matching term, drop on the floor otherwise
- # map{|str| str[/\b(love|hate|happy|sad)\b/] }
- #
- class Map < Processor
- self.register_processor
- attr_reader :blk
- # @param [Proc] proc to delegate for call
- # @yield if proc is omitted, block must be supplied
- def initialize(blk=nil, &block)
- @blk = blk || block or raise "Please supply a proc or a block to #{self.class}.new"
+ # This method is called to signal the last record has been
+ # received but that further processing may still be done, events
+ # still be yielded, &c.
+ #
+ # This can be used within an aggregating processor (like a reducer
+ # in a map/reduce job) to start processing the final aggregate of
+ # records since the "last record" has already been received.
+ def finalize
- def process(*args)
- result =*args)
- emit result unless result.nil?
+ # This method is called after all records have been passed. It
+ # signals that processing should stop.
+ # This method is called after all records have been processed
+ def stop
- def self.make(workflow, *args, &block)
- obj = new(*args, &block)
- workflow.add_stage obj
- obj
- end
- end
- #
- # Flatten emits each item in an enumerable as its own record
- #
- # @example turn a document into all its words
- # input > map{|line| line.split(/\W+/) } > flatten > output
- class Flatten < Processor
- self.register_processor
- def process(iter)
- iter.each{|*args| emit(*args) }
- end