lib/rflow/components/clock.rb in rflow-1.3.0 vs lib/rflow/components/clock.rb in rflow-1.3.1
- old
+ new
@@ -1,44 +1,95 @@
class RFlow
+ # @!parse
+ # class Message
+ # # Clock messages.
+ # module Clock
+ # # Message emitted by {RFlow::Components::Clock}. Of course the real class is {RFlow::Message}
+ # # with type +RFlow::Message::Clock::Tick+.
+ # class Tick
+ # # @!attribute name
+ # # The name of the clock.
+ # # @return [String]
+ #
+ # # @!attribute timestamp
+ # # The timestamp of the tick, in milliseconds from epoch.
+ # # @return [Integer]
+ #
+ # # Just here to force Yard to create documentation.
+ # # @!visibility private
+ # def initialize; end
+ # end
+ # end
+ # end
+
+ # Components.
module Components
+ # A clock. It ticks every _n_ seconds. Get it?
+ #
+ # Accepts config parameters:
+ # - +name+ - name of the clock, to disambiguate more than one
+ # - +tick_interval+ - how long to wait between ticks
+ #
+ # Emits {RFlow::Message}s whose internal type is {RFlow::Message::Clock::Tick}.
class Clock < Component
+ # @!visibility private
module Tick
+ # @!visibility private
SCHEMA_DIRECTORY = ::File.expand_path(::File.join(::File.dirname(__FILE__), '..', '..', '..', 'schema'))
+ # @!visibility private
SCHEMA_FILES = {'tick.avsc' => 'RFlow::Message::Clock::Tick'}
SCHEMA_FILES.each do |file_name, data_type_name|
schema_string = ::File.read(::File.join(SCHEMA_DIRECTORY, file_name))
RFlow::Configuration.add_available_data_type data_type_name, 'avro', schema_string
end
+ # @!visibility private
module Extension
+ # @!visibility private
def self.extended(base_data); base_data.data_object ||= {}; end
+ # @!visibility private
def name; data_object['name']; end
+ # @!visibility private
def name=(new_name); data_object['name'] = new_name; end
+ # @!visibility private
def timestamp; data_object['timestamp']; end
+ # @!visibility private
def timestamp=(new_ts); data_object['timestamp'] = new_ts; end
end
RFlow::Configuration.add_available_data_extension('RFlow::Message::Clock::Tick', Extension)
end
+ # @!attribute [r] tick_port
+ # Outputs {RFlow::Message::Clock::Tick} messages.
+ # @return [Component::OutputPort]
output_port :tick_port
+ # Default configuration.
DEFAULT_CONFIG = {
'name' => 'Clock',
'tick_interval' => 1
}
+ # @!visibility private
attr_reader :config, :tick_interval
+ # RFlow-called method at startup.
+ # @param config [Hash] configuration from the RFlow config file
+ # @return [void]
def configure!(config)
@config = DEFAULT_CONFIG.merge config
@tick_interval = Float(@config['tick_interval'])
end
+ # @!visibility private
def clock_name; config['name']; end
+ # RFlow-called method at startup.
+ # @return [void]
def run!
@timer = EventMachine::PeriodicTimer.new(tick_interval) { tick }
end
+ # @!visibility private
def tick
tick_port.send_message(RFlow::Message.new('RFlow::Message::Clock::Tick').tap do |m|
m.data.name = clock_name
m.data.timestamp = Integer(Time.now.to_f * 1000) # ms since epoch
end)