#!/usr/bin/env ruby ########################################################################### # Copyright (c) 2017 Alan Yorinks - All Rights Reserved. # # This file is part of Ruby-Banyan. # # Ruby Banyan is free software; you can redistribute it and/or # modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE # Version 3 as published by the Free Software Foundation; either # or (at your option) any later version. # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE # along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # banyan_base.rb ############################################################################# require 'rubygems' require 'ffi-rzmq' require 'socket' require 'msgpack' require 'optparse' ################################################################### # This the banyan backplane for ruby. It must be started before # any other banyan components are invoked. ################################################################### # noinspection RubyResolve,RubyResolve class BackPlane def initialize(subscriber_port: '43125', publisher_port: '43124', process_name: 'Backplane', loop_time: 0.001) # This is the initializer for the Banyan BackPlane class. The class must be instantiated # before starting any other Python Banyan modules # :param subscriber_port: subscriber IP port number # :param publisher_port: publisher IP port number # :param backplane_name: name to appear on the console for this backplane # :param loop_time: event loop idle timer @subscriber_port = subscriber_port @publisher_port = publisher_port @process_name = process_name @loop_time = loop_time @back_plane_ip_address = Socket.ip_address_list[1].ip_address puts('************************************************************') # noinspection RubyResolve puts(process_name + ' BackPlane on IP address: ' + @back_plane_ip_address) puts('Subscriber Port = ' + @subscriber_port) puts('Publisher Port = ' + @publisher_port) puts('Loop Time = ' + @loop_time.to_s + ' seconds') puts('************************************************************') # establish the zeromq sub and pub sockets and connect to the backplane @context = ZMQ::Context.create @subscriber = @context.socket ZMQ::SUB @publisher = @context.socket ZMQ::PUB # subscribe to all messages @subscriber.setsockopt ZMQ::SUBSCRIBE, '' # noinspection RubyResolve # this looks backwards, but is correct. the backplane # accepts messages and re-sends them. @subscriber.bind 'tcp://' + @back_plane_ip_address + ':' + @publisher_port @publisher.bind 'tcp://' + @back_plane_ip_address + ':' + @subscriber_port trap("INT") {puts "Control C detected - bye bye."; @publisher.close; @subscriber.close; @context.terminate; exit} loop do list = [] p = @subscriber.recv_strings(list, ZMQ::DONTWAIT) begin if p == -1 sleep(@loop_time) if ZMQ::Util.errno == ZMQ::EAGAIN next else puts ZMQ::Util.errno end else @publisher.send_strings(list, flags = ZMQ::DONTWAIT) end rescue Interrupt clean_up end end end def clean_up # cleanup before exiting @publisher.close @subscriber.close @context.terminate exit end end options = {publisher_port: '43124', subscriber_port: '43125', process_name: 'Unknown', loop_time: '0.01'} optparse = OptionParser.new do |opts| opts.on('-p', '--p PUBLISHER PORT', 'Publisher Port Number') do |publisher_port| options[:publisher_port] = publisher_port end opts.on('-s', '--s SUBSCRIBER PORT', 'Subscriber Port Number') do |subscriber_port| options[:subscriber_port] = subscriber_port end opts.on('-n', '--n Process Name', 'Name Of This Banyan Component') do |process_name| options[:process_name] = process_name end opts.on('-t', '--t Loop Delay Time', 'Time In Seconds') do |loop_time| options[:loop_time] = loop_time end opts.on('-h', '--help', 'Display this screen') do puts opts exit end end begin optparse.parse! end options[:loop_time] = options[:loop_time].to_f BackPlane.new(subscriber_port: options[:subscriber_port], publisher_port: options[:publisher_port], process_name: options[:process_name], loop_time: options[:loop_time])