require 'roby/log'
require 'roby/log/data_stream'
require 'roby/distributed/communication'
require 'roby/distributed/drb'
require 'tempfile'
module Roby
module Log
class Server
RING_PORT = 48904
class << self
attr_accessor :logger
end
@logger = Logger.new(STDERR)
@logger.level = Logger::INFO
@logger.progname = "Roby server"
@logger.formatter = lambda { |severity, time, progname, msg| "#{time.to_hms} #{progname} #{msg}\n" }
extend Logger::Forward
@mutex = Mutex.new
def self.synchronize
@mutex.synchronize { yield }
end
# Returns the set of servers that have been discovered by the discovery
# mechanism at this time
#
# See also enable_discovery and disable_discovery
def self.available_servers
synchronize do
@available_servers.dup
end
end
# Start an asynchronous discovery mechanism. This will fill the
# #available_servers set of servers. +broadcast+ is an array of
# addresses on which discovery should be done and +period+ is the
# discovery period in seconds
def self.enable_discovery(broadcast, port = RING_PORT, period = 10)
if @discovery_thread
raise ArgumentError, "already enabled discovery"
end
finger = Rinda::RingFinger.new(broadcast, port)
discovered_displays = Array.new
@available_servers = Array.new
@discovery_thread = Thread.new do
begin
loop do
finger.lookup_ring(period) do |remote|
synchronize do
unless @available_servers.include?(remote)
@available_servers << remote
end
discovered_displays << remote
end
end
sleep(period)
synchronize do
@available_servers, discovered_displays = discovered_displays, @available_servers
discovered_displays.clear
end
end
rescue Interrupt
end
end
end
# Stops the discovery thread if it is running
def self.disable_discovery
Roby.engine.finalizers.delete(method(:disable_discovery))
if @discovery_thread
@discovery_thread.raise Interrupt, "quitting"
@discovery_thread.join
@discovery_thread = nil
synchronize do
@available_servers.clear
end
end
end
# A stream_id => [remote_server, ...] hash containing the
# set of subscribed remote peer for each stream
attr_reader :subscriptions
# A remote_server => [queue, thread] hash which contains
# the set of connection parameters for each connected peer
attr_reader :connections
# The Distributed::RingServer object which publishes this display
# server on the network
attr_reader :ring_server
# Default value for #polling_timeout
POLLING_TIMEOUT = 0.1
attr_reader :polling_timeout
def initialize(port = RING_PORT, polling_timeout = POLLING_TIMEOUT)
@ring_server = Distributed::RingServer.new(DRbObject.new(self), :port => port)
@mutex = Mutex.new
@streams = Array.new
@connections = Hash.new
@subscriptions = Hash.new { |h, k| h[k] = Set.new }
@polling_timeout = polling_timeout
@polling = Thread.new(&method(:polling))
end
def synchronize
@mutex.synchronize { yield }
end
def connect(remote)
synchronize do
queue = Queue.new
receiver_thread = pushing_loop(remote, queue)
connections[remote] = [queue, receiver_thread]
Server.info "#{remote.__drburi} connected"
end
streams.map do |s|
[s.class.name, s.id, s.name, s.type]
end
end
def disconnect(remote)
thread = synchronize do
queue, thread = connections[remote]
if thread
thread.raise Interrupt, "quitting"
thread
end
end
thread.join if thread
Server.info "#{remote.__drburi} disconnected"
end
# Polls all data sources and pushes the samples to the subscribed
# clients
def polling
loop do
s, data = nil
done_sth = false
synchronize do
@streams.each do |s|
done_sth ||= if s.reinit?
Roby::Log::Server.info "reinitializing #{s}"
s.reinit!
reinit(s.id)
true
elsif s.has_sample?
if Roby::Log::Server.logger.debug?
Roby::Log::Server.debug "new sample for #{s} at #{s.current_time.to_hms}"
end
push(s.id, s.current_time, s.read)
true
end
end
end
unless done_sth
sleep(polling_timeout)
end
end
rescue Interrupt
end
# Creates a new thread to send updates to +remote+
def pushing_loop(remote, queue)
Thread.new do
begin
loop do
calls = []
while !queue.empty?
calls << queue.pop
end
remote.demux(calls)
if calls.find { |m, _| m == :quit }
break
end
end
rescue Interrupt
rescue DRb::DRbConnError => e
Server.warn "cannot communicate with #{remote.__drburi}. Assuming we are disconnected"
ensure
synchronize do
# Remove all subscriptions for +remote+
subscriptions.each_value do |subscribed|
subscribed.delete(remote)
end
queue, thread = connections.delete(remote)
end
end
end
end
private :pushing_loop
# New stream
def added_stream(stream)
synchronize do
@streams << stream
connections.each_value do |queue, _|
queue.push [:added_stream, stream.class.name, stream.id, stream.name, stream.type]
end
end
end
# Stream +id+ has stopped
def removed_stream(id)
synchronize do
found = false
@streams.delete_if { |s| found ||= (s.id == id) }
unless found
raise ArgumentError, "no such stream"
end
connections.each_value do |queue, _|
queue.push [:removed_stream, id]
end
subscriptions.delete(id)
end
end
# Returns a set of Roby::Log::DataStream objects describing the
# available data sources on this stream
def streams
synchronize do
@streams.dup
end
end
# Make +remote+ subscribe to the stream identified by +id+. When
# new data is available, #push will be called on +remote+. The
# exact format of the pushed sample depends on the type of the
# stream
#
# If the stream stop existing (because it source has quit for
# instance), #removed_stream will be called on the remote object
def subscribe(id, remote)
synchronize do
if s = @streams.find { |s| s.id == id }
subscriptions[id] << remote
if data = s.read_all
remote.init(id, data)
end
else
raise ArgumentError, "no such stream"
end
end
end
# Rmoves a subscription of +remote+ on +id+
def unsubscribe(id, remote)
synchronize do
if subscriptions.has_key?(id)
subscriptions[id].delete(remote)
end
end
end
# Reinitializes the stream +id+. It is used when a stream has
# been truncated (for instance when a log file has been restarted)
#
# This must be called in a synchronize { } block
def reinit(id)
subscriptions[id].each do |remote|
queue, _ = connections[remote]
queue.push [:reinit, id]
end
end
private :reinit
# Pushes a new sample on stream +id+
#
# This must be called in a synchronize { } block
def push(id, time, sample)
if subscriptions.has_key?(id)
subscriptions[id].each do |remote|
queue, _ = connections[remote]
queue.push [:push, id, time, sample]
end
end
end
private :push
def quit
if @polling
@polling.raise Interrupt, "quitting"
@polling.join
end
connections.each_value do |queue, thread|
queue.push [:quit]
thread.join
end
end
end
# This class manages a data stream which is present remotely. Data is sent
# as-is over the network from a Server object to a Client object.
class RemoteStream < DataStream
def initialize(stream_model, id, name, type)
super(name, type)
@id = id
@stream_model = stream_model
@data_file = Tempfile.new("remote_stream_#{name}_#{type}".gsub("/", "_"))
@data_file.sync = true
@mutex = Mutex.new
@pending_samples = Array.new
end
def synchronize; @mutex.synchronize { yield } end
# The data file in which we save the data received so far
attr_reader :data_file
# The DataStream class of the remote stream. This is used for
# decoding
attr_reader :stream_model
def added_decoder(dec)
synchronize do
Server.info "#{self} initializing #{dec}"
if data_file.stat.size == 0
return
end
data_file.rewind
chunk_length = data_file.read(4).unpack("N").first
chunk = data_file.read(chunk_length)
init(chunk) do |sample|
dec.process(sample)
end
while !data_file.eof?
chunk_length = data_file.read(4).unpack("N").first
chunk = data_file.read(chunk_length)
dec.process(decode(chunk))
end
display
end
end
def reinit!
data_file.truncate(0)
@pending_samples.clear
@current_time = nil
super
end
# Called when new data is available
def push(time, data)
Server.info "#{self} got #{data.size} bytes of data at #{time.to_hms}"
synchronize do
@range[0] ||= time
@range[1] = time
@current_time ||= time
@pending_samples.unshift [time, data]
data_file << [data.size].pack("N") << data
end
end
attr_reader :current_time
def next_time
synchronize do
if has_sample?
@pending_samples.first
end
end
end
def range
synchronize { super }
end
def has_sample?
synchronize do
!@pending_samples.empty?
end
end
def read
if reinit?
reinit!
end
@current_time, sample = @pending_samples.pop
sample
end
def init(data, &block)
Server.info "#{self} initializing with #{data.size} bytes of data"
data_file.rewind
data_file << [data.size].pack("N") << data
stream_model.init(data, &block)
end
def decode(data)
stream_model.decode(data)
end
end
class Client
# The remote display server
attr_reader :server
def initialize(server)
@server = server
@pending = Hash.new
connect
end
def streams
@streams.values
end
def added_stream(klass_name, id, name, type)
begin
require klass_name.underscore
rescue LoadError
end
@streams[id] = RemoteStream.new(klass_name.constantize, id, name, type)
super if defined? super
end
def removed_stream(id)
@streams.delete(id)
super if defined? super
end
attr_reader :last_update
MIN_DISPLAY_DURATION = 5
def demux(calls)
calls.each do |args|
send(*args)
end
streams.each do |s|
while s.has_sample?
s.synchronize do
s.advance
end
end
end
sleep(0.5)
end
def subscribe(stream)
@server.subscribe(stream.id, DRbObject.new(self))
end
def unsubscribe(stream)
@server.unsubscribe(stream.id, DRbObject.new(self))
end
def init(id, data)
s = @streams[id]
Server.info "initializing #{s}"
s.synchronize do
s.init(data) do |sample|
s.decoders.each do |dec|
dec.process(sample)
end
end
end
end
def reinit(id)
@streams[id].reinit = true
end
def push(id, time, data)
@streams[id].push(time, data)
end
def connected?; !!@streams end
def connect
if connected?
raise ArgumentError, "already connected"
end
@streams = Hash.new
server.connect(DRbObject.new(self)).
each do |klass, id, name, type|
added_stream(klass, id, name, type)
end
ObjectSpace.define_finalizer(self, Client.remote_streams_finalizer(server, DRbObject.new(self)))
rescue Exception
disconnect
raise
end
def self.remote_streams_finalizer(server, drb_object)
Proc.new do
begin
server.disconnect(drb_object)
rescue DRb::DRbConnError
rescue Exception => e
STDERR.puts e.full_message
end
end
end
def disconnect
@streams = nil
server.disconnect(DRbObject.new(self))
rescue DRb::DRbConnError
end
def quit
@streams = nil
@server = nil
end
end
end
end
if $0 == __FILE__
include Roby
include Roby::Log
# First find the available servers
STDERR.puts "Finding available servers ..."
DRb.start_service
Server.enable_discovery 'localhost'
sleep(0.5)
Server.available_servers.each do |server|
remote = RemoteStreams.new(server)
puts "#{server.__drburi}:"
remote.streams.each do |s|
puts " #{s.name} [#{s.type}]"
end
end
end