Sha256: 2d942c833b6a74e80d7d85a4511852352e0ce430d26fb6ad5862a573b7556639

Contents?: true

Size: 727 Bytes

Versions: 2

Compression:

Stored size: 727 Bytes

Contents

require 'java'

java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
java_import 'com.cloudera.flume.core.EventSinkDecorator'

module Wukong 
  class Decorator < EventSinkDecorator

    def initialize(mapper, reducer=nil, options={})
      super(nil)
      @mapper = mapper.new
    end

    def append(e)
      line   = String.from_java_bytes(e.getBody)
      record = @mapper.recordize(line.chomp)
      @mapper.process(*record) do |output|
        processed = output.to_flat.join("\t")
        event     = EventImpl.new(processed.to_java_bytes, e.getTimestamp, e.getPriority, e.getNanos, e.getHost, e.getAttrs)
        super event
      end
    end

    def run() self ; end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
wukong-3.0.0.pre old/wukong/decorator.rb
wukong-2.0.2 lib/wukong/decorator.rb