example/basic_extensions.rb in rflow-0.0.5 vs example/basic_extensions.rb in rflow-1.0.0a1

- old
+ new

@@ -23,11 +23,11 @@ class RFlow::Components::GenerateIntegerSequence < RFlow::Component output_port :out output_port :even_odd_out - + def configure!(config) @start = config['start'].to_i @finish = config['finish'].to_i @step = config['step'] ? config['step'].to_i : 1 # If interval seconds is not given, it will default to 0 @@ -35,21 +35,21 @@ end # Note that this uses the timer (sometimes with 0 interval) so as # not to block the reactor def run! - timer = EM::PeriodicTimer.new(@interval_seconds) do + timer = EM::PeriodicTimer.new(@interval_seconds) do message = RFlow::Message.new('RFlow::Message::Data::Integer') message.data.data_object = @start out.send_message message if @start % 2 == 0 even_odd_out['even'].send_message message else even_odd_out['odd'].send_message message end even_odd_out.send_message message - + @start += @step timer.cancel if @start > @finish end end @@ -57,11 +57,11 @@ class RFlow::Components::Replicate < RFlow::Component input_port :in output_port :out output_port :errored - + def process_message(input_port, input_port_key, connection, message) puts "Processing message in Replicate" out.each do |connections| puts "Replicating" begin @@ -83,11 +83,11 @@ def configure!(config) @filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }") end - + def process_message(input_port, input_port_key, connection, message) puts "Processing message in RubyProcFilter" begin if @filter_proc.call(message) filtered.send_message message @@ -110,21 +110,21 @@ self.output_file_path = config['output_file_path'] self.output_file = File.new output_file_path, 'w+' end #def run!; end - + def process_message(input_port, input_port_key, connection, message) puts "About to output to a file #{output_file_path}" output_file.puts message.data.data_object.inspect output_file.flush end - + def cleanup output_file.close end - + end # TODO: Ensure that all the following methods work as they are # supposed to. This is the interface that I'm adhering to class SimpleComponent < RFlow::Component