lib/omf_oml/endpoint.rb in omf_oml-1.0.0 vs lib/omf_oml/endpoint.rb in omf_oml-1.1.0

- old
+ new

@@ -28,10 +28,11 @@ end end def initialize(port = 5000, host = "0.0.0.0") require 'socket' + debug "OML client listening on #{port}" @serv = TCPServer.new(host, port) @running = false @on_new_stream_procs = {} end @@ -106,11 +107,11 @@ end private def parse_header(socket, &reportStreamProc) while (l = socket.gets.strip) - #puts "H>> '#{l}'" + #puts "H>> '#{l}'" return if l.length == 0 key, *value = l.split(':') if (key == 'schema') parse_schema(value.join(':')) @@ -124,33 +125,68 @@ def parse_schema(desc) debug "SCHEMA: #{desc}" els = desc.split(' ') #puts "ELS: #{els.inspect}" index = els.shift.to_i - 1 + if (index < 0) + # meta data - ignore + return + end + if @streams[index] + warn "Schema '#{index}' already defined" + return + end sname = els.shift schema_desc = els.collect do |el| name, type = el.split(':') + if (type == 'blob'); type = 'blob64' end # base64 encode blob {:name => name.to_sym, :type => type.to_sym} end schema_desc.insert(0, {:name => :oml_ts, :type => :double}) schema_desc.insert(1, {:name => :sender_id, :type => :string}) - schema_desc.insert(2, {:name => :oml_seq_no, :type => :integer}) + schema_desc.insert(2, {:name => :oml_seq_no, :type => :integer}) schema = OMF::OML::OmlSchema.create(schema_desc) + debug "New schema '#{sname}(#{index})' with schema '#{schema}'" @streams[index] = tuple = OmlTuple.new(sname, schema) @endpoint.report_new_stream(sname, tuple) end + def parse_meta_row(row) + #puts "META>> #{row.inspect}" + unless row.length == 5 + warn "Received mis-formatted META tuple - #{row}" + return + end + key = "#{row[2]}/#{row[3]}".downcase + value = row[4] + debug "META: '#{key}' - '#{value}'" + case key + when './schema' + parse_schema(value) + else + warn "Unknwon META type '#{key}'" + end + end + def parse_rows(socket) sender_id = @header['sender-id'] || 'unknown' while (l = socket.gets) return if l.length == 0 els = l.strip.split("\t") - #puts "R>> '#{els.inspect}'" - index = els.delete_at(1).to_i - 1 + #puts "R(#{sender_id})>> '#{els.inspect}'" + index = els.delete_at(1).to_i + if (index == 0) + parse_meta_row(els) + next + end + unless stream = @streams[index - 1] + warn "Receiving tuples for unknown schema index '#{index}'" + next + end els.insert(1, sender_id) - row = @streams[index].parse_tuple(els) + row = @streams[index - 1].parse_tuple(els) end end end # OMLEndpoint @@ -158,16 +194,18 @@ end if $0 == __FILE__ require 'omf_oml/table' - ep = OMF::OML::OmlEndpoint.new(3000) - toml = OMF::OML::OmlTable.new('oml', [[:x], [:y]], :max_size => 20) - ep.on_new_stream() do |s| - puts "New stream: #{s}" - s.on_new_vector() do |v| - puts "New vector: #{v.select(:oml_ts, :value).join('|')}" - toml.add_row(v.select(:oml_ts, :value)) + require 'omf_base/lobject' + OMF::Base::Loggable.init_log 'endpoint' + + ep = OMF::OML::OmlEndpoint.new(3003) + ep.on_new_stream() do |name, stream| + puts "New stream: #{name}-#{stream}" + table = stream.create_table(name + '_tbl', :max_size => 5) + table.on_content_changed do |action, change| + puts "TTTT > #{action} - #{change}" end end ep.run(false) end