lib/omf_oml/endpoint.rb in omf_oml-1.0.0 vs lib/omf_oml/endpoint.rb in omf_oml-1.1.0
- old
+ new
@@ -28,10 +28,11 @@
end
end
def initialize(port = 5000, host = "0.0.0.0")
require 'socket'
+ debug "OML client listening on #{port}"
@serv = TCPServer.new(host, port)
@running = false
@on_new_stream_procs = {}
end
@@ -106,11 +107,11 @@
end
private
def parse_header(socket, &reportStreamProc)
while (l = socket.gets.strip)
- #puts "H>> '#{l}'"
+ #puts "H>> '#{l}'"
return if l.length == 0
key, *value = l.split(':')
if (key == 'schema')
parse_schema(value.join(':'))
@@ -124,33 +125,68 @@
def parse_schema(desc)
debug "SCHEMA: #{desc}"
els = desc.split(' ')
#puts "ELS: #{els.inspect}"
index = els.shift.to_i - 1
+ if (index < 0)
+ # meta data - ignore
+ return
+ end
+ if @streams[index]
+ warn "Schema '#{index}' already defined"
+ return
+ end
sname = els.shift
schema_desc = els.collect do |el|
name, type = el.split(':')
+ if (type == 'blob'); type = 'blob64' end # base64 encode blob
{:name => name.to_sym, :type => type.to_sym}
end
schema_desc.insert(0, {:name => :oml_ts, :type => :double})
schema_desc.insert(1, {:name => :sender_id, :type => :string})
- schema_desc.insert(2, {:name => :oml_seq_no, :type => :integer})
+ schema_desc.insert(2, {:name => :oml_seq_no, :type => :integer})
schema = OMF::OML::OmlSchema.create(schema_desc)
+ debug "New schema '#{sname}(#{index})' with schema '#{schema}'"
@streams[index] = tuple = OmlTuple.new(sname, schema)
@endpoint.report_new_stream(sname, tuple)
end
+ def parse_meta_row(row)
+ #puts "META>> #{row.inspect}"
+ unless row.length == 5
+ warn "Received mis-formatted META tuple - #{row}"
+ return
+ end
+ key = "#{row[2]}/#{row[3]}".downcase
+ value = row[4]
+ debug "META: '#{key}' - '#{value}'"
+ case key
+ when './schema'
+ parse_schema(value)
+ else
+ warn "Unknwon META type '#{key}'"
+ end
+ end
+
def parse_rows(socket)
sender_id = @header['sender-id'] || 'unknown'
while (l = socket.gets)
return if l.length == 0
els = l.strip.split("\t")
- #puts "R>> '#{els.inspect}'"
- index = els.delete_at(1).to_i - 1
+ #puts "R(#{sender_id})>> '#{els.inspect}'"
+ index = els.delete_at(1).to_i
+ if (index == 0)
+ parse_meta_row(els)
+ next
+ end
+ unless stream = @streams[index - 1]
+ warn "Receiving tuples for unknown schema index '#{index}'"
+ next
+ end
els.insert(1, sender_id)
- row = @streams[index].parse_tuple(els)
+ row = @streams[index - 1].parse_tuple(els)
end
end
end # OMLEndpoint
@@ -158,16 +194,18 @@
end
if $0 == __FILE__
require 'omf_oml/table'
- ep = OMF::OML::OmlEndpoint.new(3000)
- toml = OMF::OML::OmlTable.new('oml', [[:x], [:y]], :max_size => 20)
- ep.on_new_stream() do |s|
- puts "New stream: #{s}"
- s.on_new_vector() do |v|
- puts "New vector: #{v.select(:oml_ts, :value).join('|')}"
- toml.add_row(v.select(:oml_ts, :value))
+ require 'omf_base/lobject'
+ OMF::Base::Loggable.init_log 'endpoint'
+
+ ep = OMF::OML::OmlEndpoint.new(3003)
+ ep.on_new_stream() do |name, stream|
+ puts "New stream: #{name}-#{stream}"
+ table = stream.create_table(name + '_tbl', :max_size => 5)
+ table.on_content_changed do |action, change|
+ puts "TTTT > #{action} - #{change}"
end
end
ep.run(false)
end