example/basic_extensions.rb in rflow-1.0.0a1 vs example/basic_extensions.rb in rflow-1.0.0a2
- old
+ new
@@ -1,11 +1,9 @@
# This will/should bring in available components and their schemas
require 'rflow/components'
require 'rflow/message'
-#RFlow::Configuration.add_available_data_schema RFlow::Message::Data::AvroSchema.new('Integer', long_integer_schema)
-
# Example of creating and registering a data extension
module SimpleDataExtension
# Use this to default/verify the data in data_object
def self.extended(base_data)
base_data.data_object
@@ -13,130 +11,34 @@
def my_method; end
end
RFlow::Configuration.add_available_data_extension('RFlow::Message::Data::Integer', SimpleDataExtension)
-
-
-# Example of creating and registering a new schema
-long_integer_schema = '{"type": "long"}'
-RFlow::Configuration.add_available_data_type('RFlow::Message::Data::Integer', 'avro', long_integer_schema)
-
-
-class RFlow::Components::GenerateIntegerSequence < RFlow::Component
- output_port :out
- output_port :even_odd_out
-
- def configure!(config)
- @start = config['start'].to_i
- @finish = config['finish'].to_i
- @step = config['step'] ? config['step'].to_i : 1
- # If interval seconds is not given, it will default to 0
- @interval_seconds = config['interval_seconds'].to_i
- end
-
- # Note that this uses the timer (sometimes with 0 interval) so as
- # not to block the reactor
- def run!
- timer = EM::PeriodicTimer.new(@interval_seconds) do
- message = RFlow::Message.new('RFlow::Message::Data::Integer')
- message.data.data_object = @start
- out.send_message message
- if @start % 2 == 0
- even_odd_out['even'].send_message message
- else
- even_odd_out['odd'].send_message message
- end
- even_odd_out.send_message message
-
- @start += @step
- timer.cancel if @start > @finish
- end
- end
-
-end
-
-class RFlow::Components::Replicate < RFlow::Component
- input_port :in
- output_port :out
- output_port :errored
-
- def process_message(input_port, input_port_key, connection, message)
- puts "Processing message in Replicate"
- out.each do |connections|
- puts "Replicating"
- begin
- connections.send_message message
- rescue Exception => e
- puts "Exception #{e.message}"
- errored.send_message message
- end
- end
- end
-end
-
-puts "Before RubyProcFilter"
-class RFlow::Components::RubyProcFilter < RFlow::Component
- input_port :in
- output_port :filtered
- output_port :dropped
- output_port :errored
-
-
- def configure!(config)
- @filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }")
- end
-
- def process_message(input_port, input_port_key, connection, message)
- puts "Processing message in RubyProcFilter"
- begin
- if @filter_proc.call(message)
- filtered.send_message message
- else
- dropped.send_message message
- end
- rescue Exception => e
- puts "Attempting to send message to errored #{e.message}"
- errored.send_message message
- end
- end
-end
-
-puts "Before FileOutput"
class RFlow::Components::FileOutput < RFlow::Component
attr_accessor :output_file_path, :output_file
input_port :in
def configure!(config)
self.output_file_path = config['output_file_path']
self.output_file = File.new output_file_path, 'w+'
end
- #def run!; end
-
def process_message(input_port, input_port_key, connection, message)
- puts "About to output to a file #{output_file_path}"
output_file.puts message.data.data_object.inspect
output_file.flush
end
-
def cleanup
output_file.close
end
-
end
-# TODO: Ensure that all the following methods work as they are
-# supposed to. This is the interface that I'm adhering to
class SimpleComponent < RFlow::Component
input_port :in
output_port :out
def configure!(config); end
def run!; end
def process_message(input_port, input_port_key, connection, message); end
def shutdown!; end
def cleanup!; end
end
-
-