require 'omf_oml/tuple' require 'omf_oml/table' module OMF::OML # Read the content of a table and feeds it out as a tuple store. # After creation of the object. The actual tuple feed is started # with a call to +run+. # class OmlSqlRow < OmlTuple attr_reader :row # # Create a representation of a row in a database. Can be used to fill a table. # # @param [String] sql_table_name - name of SQL table in respective SQL database # @param [OmlSchema] schema - the schema describing the tuple # @param [Sequel] db - Database # @param [Hash] opts: # - offset: Ignore first +offset+ rows. If negative or zero serve +offset+ rows initially # - limit: Number of rows to fetch each time [1000] # - check_interval: Interval in seconds when to check for new data. If 0, only run once. # - query_interval: Interval between consecutive queries when processing large result set. # def initialize(sql_table_name, schema, db, opts = {}) @sname = sql_table_name @schema = schema raise "Expected OmlSchema but got '#{schema.class}" unless schema.is_a? OmlSchema @db = db unless @offset = opts[:offset] @offset = 0 end @limit = opts[:limit] @limit = 1000 unless @limit @check_interval = opts[:check_interval] @check_interval = 0 unless @check_interval @query_interval = opts[:query_interval] @on_new_vector_proc = {} super opts[:name] || sql_table_name, schema end # Return a specific element of the vector identified either # by it's name, or its col index # def [](name_or_index) if name_or_index.is_a? Integer @row[@schema.name_at_index(name_or_index)] else unless @row.key? name_or_index raise "Unknown column name '#{name_or_index}'" end @row[name_or_index] end end # Return the elements of the row as an array using the # associated schema or 'schema' if non-nil. # def to_a(schema = nil) a = (schema || @schema).hash_to_row(@row) # don't need type conversion as sequel is doing this for us end # Return an array including the values for the names elements # given as parameters. # def select(*col_names) r = @row col_names.collect do |n| unless @row.key? n raise "Unknown column name '#{n}'" end @row[n] end end def ts self[:oml_ts_server] end def seq_no self[:oml_seq] end # Register a proc to be called when a new tuple arrived # on this stream. # def on_new_tuple(key = :_, &proc) if proc @on_new_vector_proc[key] = proc else @on_new_vector_proc.delete key end run() unless @on_new_vector_proc.empty? end # Return a table which will capture the content of this tuple stream. # # @param [string] name - Name to use for returned table # @param [Hash] opts Options to be passed on to Table constructor # @opts [boolean] opts :include_oml_internals If true will also include OML header columns # @opts [OmlSchema] opts :schema use specific schema for table (Needs to be a subset of the tuples schema) # def to_stream(opts = {}, &block) unless schema = opts.delete(:schema) include_oml_internals = (opts[:include_oml_internals] != false) schema = self.schema.clone(!include_oml_internals) if include_oml_internals # replace sender_id by sender ... see _run_once schema.replace_column_at 0, :oml_sender end end self.on_new_tuple(rand()) do |t| #v = t.to_a(schema) v = t.row block.arity == 1 ? block.call(v) : block.call(v, schema) end schema end # Return a table which will capture the content of this tuple stream. # # @param [string] name - Name to use for returned table # @param [Hash] opts Options to be passed on to Table constructor # @opts [boolean] opts :include_oml_internals If true will also include OML header columns # @opts [OmlSchema] opts :schema use specific schema for table (Needs to be a subset of the tuples schema) # def to_table(name = nil, opts = {}) unless name name = @sname end unless schema = opts.delete(:schema) include_oml_internals = (opts[:include_oml_internals] != false) schema = self.schema.clone(!include_oml_internals) if include_oml_internals # replace sender_id by sender ... see _run_once schema.replace_column_at 0, :oml_sender end end t = OMF::OML::OmlTable.new(name, schema, opts) #puts ">>>>SCHEMA>>> #{schema.inspect}" self.on_new_tuple(t) do |v| r = v.to_a(schema) #puts r.inspect t.add_row(r) end t end protected def run(in_thread = true) return if @running if in_thread Thread.new do begin _run rescue Exception => ex error "Exception in OmlSqlRow: #{ex}" debug "Exception in OmlSqlRow: #{ex.backtrace.join("\n\t")}" end end else _run end end private def _run if @check_interval <= 0 while _run_once; end else @running = true while (@running) begin unless _run_once # All rows read, wait a bit for news to show up sleep @check_interval end rescue Exception => ex warn ex debug "\t", ex.backtrace.join("\n\t") end end end end # Run a query on database an serve all rows found one at a time. # Return true if there might be more rows in the database def _run_once row_cnt = 0 t = table_name = @sname if (@offset < 0) cnt = @db[table_name.to_sym].count() @offset = cnt + @offset # @offset was negative here debug("Initial offset #{@offset} in '#{table_name}' with #{cnt} rows") @offset = 0 if @offset < 0 end @db["SELECT _senders.name as oml_sender, #{t}.* FROM #{t} INNER JOIN _senders ON (_senders.id = #{t}.oml_sender_id) LIMIT #{@limit} OFFSET #{@offset};"].each do |r| @row = r @on_new_vector_proc.each_value do |proc| proc.call(self) end row_cnt += 1 end @offset += row_cnt debug "Read #{row_cnt} (total #{@offset}) rows from '#{@sname}'" if row_cnt > 0 if more_to_read = row_cnt >= @limit # there could be more to read sleep @query_interval if @query_interval # don't hammer database end more_to_read end end # OmlSqlRow end if $0 == __FILE__ require 'omf_oml/table' ep = OMF::OML::OmlSqlSource.new('brooklynDemo.sq3') ep.on_new_stream() do |s| puts ">>>>>>>>>>>> New stream #{s.stream_name}: #{s.names.join(', ')}" case s.stream_name when 'wimaxmonitor_wimaxstatus' select = [:oml_ts_server, :sender_hostname, :frequency, :signal, :rssi, :cinr, :avg_tx_pw] when 'GPSlogger_gps_data' select = [:oml_ts_server, :oml_sender_id, :lat, :lon] end s.on_new_vector() do |v| puts "New vector(#{s.stream_name}): #{v.select(*select).join('|')}" end end ep.run() end