example/basic_extensions.rb in rflow-0.0.5 vs example/basic_extensions.rb in rflow-1.0.0a1
- old
+ new
@@ -23,11 +23,11 @@
class RFlow::Components::GenerateIntegerSequence < RFlow::Component
output_port :out
output_port :even_odd_out
-
+
def configure!(config)
@start = config['start'].to_i
@finish = config['finish'].to_i
@step = config['step'] ? config['step'].to_i : 1
# If interval seconds is not given, it will default to 0
@@ -35,21 +35,21 @@
end
# Note that this uses the timer (sometimes with 0 interval) so as
# not to block the reactor
def run!
- timer = EM::PeriodicTimer.new(@interval_seconds) do
+ timer = EM::PeriodicTimer.new(@interval_seconds) do
message = RFlow::Message.new('RFlow::Message::Data::Integer')
message.data.data_object = @start
out.send_message message
if @start % 2 == 0
even_odd_out['even'].send_message message
else
even_odd_out['odd'].send_message message
end
even_odd_out.send_message message
-
+
@start += @step
timer.cancel if @start > @finish
end
end
@@ -57,11 +57,11 @@
class RFlow::Components::Replicate < RFlow::Component
input_port :in
output_port :out
output_port :errored
-
+
def process_message(input_port, input_port_key, connection, message)
puts "Processing message in Replicate"
out.each do |connections|
puts "Replicating"
begin
@@ -83,11 +83,11 @@
def configure!(config)
@filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }")
end
-
+
def process_message(input_port, input_port_key, connection, message)
puts "Processing message in RubyProcFilter"
begin
if @filter_proc.call(message)
filtered.send_message message
@@ -110,21 +110,21 @@
self.output_file_path = config['output_file_path']
self.output_file = File.new output_file_path, 'w+'
end
#def run!; end
-
+
def process_message(input_port, input_port_key, connection, message)
puts "About to output to a file #{output_file_path}"
output_file.puts message.data.data_object.inspect
output_file.flush
end
-
+
def cleanup
output_file.close
end
-
+
end
# TODO: Ensure that all the following methods work as they are
# supposed to. This is the interface that I'm adhering to
class SimpleComponent < RFlow::Component