require 'vayacondios-client' Settings.define :monitor_interval, :default => 50_000, :type => Integer module Wukong class ProcessorError < StandardError ; end class Processor < Hanuman::Action include Hanuman::IsOwnInputSlot include Hanuman::IsOwnOutputSlot field :name, Symbol, :default => ->{ self.class.handle } field :count, Integer, doc: 'Number of records seen this run', default: 0 # override this in your subclass def process(record) end # passes a record on down the line def emit(record) self.count += 1 if (count % Settings.monitor_interval.to_i == 0) log.info "emit\t%-23s\t%-47s\t%s" % [self.class, self.inspect, record.inspect] end output.process(record) rescue Wukong::ProcessorError raise rescue StandardError => err next_block = output.name rescue "(bad stage)" log.warn "#{self}: error emitting #{next_block}: #{err.message}" raise Wukong::ProcessorError, err.message, err.backtrace end def bad_record(*args) BadRecord.make(*args) end def self.register_processor(name=nil, &block) register_action(name, &block) end include Vayacondios::Notifications class_attribute :log self.log = Log config :error_handler, Vayacondios::NotifierFactory, :default => ->{ Vayacondios::NotifierFactory.receive(type: 'log', log: self.log) } def bad_record(record, options = {}) error_handler.notify(record, options.merge(level: 'error')) end end class AsIs < Processor # accepts records, emits as-is def process(*args) emit(*args) end register_processor end class Null < Processor self.register_processor # accepts records, emits none def process(*) # ze goggles... zey do nussing! end end # # Foreach calls a block on every record, and depends on the block to call # emit. You can emit one record, many records, or no records, and with any # contents. If you'll always emit exactly one record out per record in, # you may prefer Wukong::Widget::Map. # # @example regenerate a wordbag with counts matching the original # foreach{|rec| rec.count.times{ emit(rec.word) } } # # @see Project # @see Map class Foreach < Processor self.register_processor # @param [Proc] proc used for body of process method # @yield ... or supply it as a &block arg. def initialize(prc=nil, &block) prc ||= block or raise "Please supply a proc or a block to #{self.class}.new" define_singleton_method(:process, prc) end def self.make(workflow, *args, &block) obj = new(*args, &block) workflow.add_stage obj obj end end # # Evaluates the block and emits the result if non-nil # # @example turn a record into a tuple # map{|rec| rec.attributes.values } # # @example pass along first matching term, drop on the floor otherwise # map{|str| str[/\b(love|hate|happy|sad)\b/] } # class Map < Processor self.register_processor attr_reader :blk # @param [Proc] proc to delegate for call # @yield if proc is omitted, block must be supplied def initialize(blk=nil, &block) @blk = blk || block or raise "Please supply a proc or a block to #{self.class}.new" end def process(*args) result = blk.call(*args) emit result unless result.nil? end def self.make(workflow, *args, &block) obj = new(*args, &block) workflow.add_stage obj obj end end # # Flatten emits each item in an enumerable as its own record # # @example turn a document into all its words # input > map{|line| line.split(/\W+/) } > flatten > output class Flatten < Processor self.register_processor def process(iter) iter.each{|*args| emit(*args) } end end end