example/basic_extensions.rb in rflow-1.0.0a1 vs example/basic_extensions.rb in rflow-1.0.0a2

- old
+ new

@@ -1,11 +1,9 @@ # This will/should bring in available components and their schemas require 'rflow/components' require 'rflow/message' -#RFlow::Configuration.add_available_data_schema RFlow::Message::Data::AvroSchema.new('Integer', long_integer_schema) - # Example of creating and registering a data extension module SimpleDataExtension # Use this to default/verify the data in data_object def self.extended(base_data) base_data.data_object @@ -13,130 +11,34 @@ def my_method; end end RFlow::Configuration.add_available_data_extension('RFlow::Message::Data::Integer', SimpleDataExtension) - - -# Example of creating and registering a new schema -long_integer_schema = '{"type": "long"}' -RFlow::Configuration.add_available_data_type('RFlow::Message::Data::Integer', 'avro', long_integer_schema) - - -class RFlow::Components::GenerateIntegerSequence < RFlow::Component - output_port :out - output_port :even_odd_out - - def configure!(config) - @start = config['start'].to_i - @finish = config['finish'].to_i - @step = config['step'] ? config['step'].to_i : 1 - # If interval seconds is not given, it will default to 0 - @interval_seconds = config['interval_seconds'].to_i - end - - # Note that this uses the timer (sometimes with 0 interval) so as - # not to block the reactor - def run! - timer = EM::PeriodicTimer.new(@interval_seconds) do - message = RFlow::Message.new('RFlow::Message::Data::Integer') - message.data.data_object = @start - out.send_message message - if @start % 2 == 0 - even_odd_out['even'].send_message message - else - even_odd_out['odd'].send_message message - end - even_odd_out.send_message message - - @start += @step - timer.cancel if @start > @finish - end - end - -end - -class RFlow::Components::Replicate < RFlow::Component - input_port :in - output_port :out - output_port :errored - - def process_message(input_port, input_port_key, connection, message) - puts "Processing message in Replicate" - out.each do |connections| - puts "Replicating" - begin - connections.send_message message - rescue Exception => e - puts "Exception #{e.message}" - errored.send_message message - end - end - end -end - -puts "Before RubyProcFilter" -class RFlow::Components::RubyProcFilter < RFlow::Component - input_port :in - output_port :filtered - output_port :dropped - output_port :errored - - - def configure!(config) - @filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }") - end - - def process_message(input_port, input_port_key, connection, message) - puts "Processing message in RubyProcFilter" - begin - if @filter_proc.call(message) - filtered.send_message message - else - dropped.send_message message - end - rescue Exception => e - puts "Attempting to send message to errored #{e.message}" - errored.send_message message - end - end -end - -puts "Before FileOutput" class RFlow::Components::FileOutput < RFlow::Component attr_accessor :output_file_path, :output_file input_port :in def configure!(config) self.output_file_path = config['output_file_path'] self.output_file = File.new output_file_path, 'w+' end - #def run!; end - def process_message(input_port, input_port_key, connection, message) - puts "About to output to a file #{output_file_path}" output_file.puts message.data.data_object.inspect output_file.flush end - def cleanup output_file.close end - end -# TODO: Ensure that all the following methods work as they are -# supposed to. This is the interface that I'm adhering to class SimpleComponent < RFlow::Component input_port :in output_port :out def configure!(config); end def run!; end def process_message(input_port, input_port_key, connection, message); end def shutdown!; end def cleanup!; end end - -