require 'singleton' module Paho class Proxy include Singleton QOS = 2 LOCAL_MQTT = 'tcp://localhost:1883' attr_accessor :topic attr_accessor :qos attr_accessor :processor attr_reader :clients class << self def subscribe(processor, opts = {}) self.instance.subscribe(processor, opts) end def publisher self.instance end def publish(topic, payload) self.instance.publish(topic, payload) end def disconnect! self.instance.disconnect! end end def initialize @clients = [] end def connect!(opts = {}) if client.isConnected true else client.connect(connection_options(opts)) end end # Subscribe to a topic. # # The required parameter has to respond to the messageArrived method. # # @param processor Paho::Processor An object that responds to the `topic` and `messageArrived` methods def subscribe(processor, opts = {}) topic = processor.topic client_id = id(topic) qos = opts.delete(:qos) { QOS } uri = opts.delete(:uri) { LOCAL_MQTT } log "Creating new subscription for #{topic} with id #{client_id}" c = Paho::MqttClient.new(uri, client_id, nil) c.setCallback(processor) c.connect(connection_options(opts)) c.subscribe(topic, qos) log "Created new subscription for #{topic} with id #{client_id}" @clients << c true rescue Java::OrgEclipsePahoClientMqttv3::MqttException => e fail ArgumentError, e.message end # Publish an MQTT message on a topic # # @param topic String The topic to publish to # @param payload String The contents of the message as a plain string def publish(topic, payload) connect! message = Paho::MqttMessage.new message.setQos 2 message.setPayload payload.to_s.to_java_bytes message.setMutable false log "Publishing to #{topic} #{payload}" client.publish topic, message log "Published to #{topic} #{payload}" end def client @client ||= Paho::MqttClient.new("tcp://localhost:1883", id) end def disconnect! client.disconnect if client.isConnected @clients.each { |c| c.disconnect if c.isConnected } end private # Generate an ID that is unique and repeatable for each machine. This is # required in order to have the clean session flag work correctly. # # @param key String The key to use to generate the client ID. def id(key = Socket.gethostname) "paho-#{key.to_s.crypt('paho')}" end def connection_options(opts = {}) clean_session = opts.fetch(:clean_session) { true } username = opts[:username] password = opts[:password] options = Paho::MqttConnectOptions.new options.setCleanSession clean_session options.setUserName(username) unless username.nil? options.setPassword(password.to_java.toCharArray) unless password.nil? options end def log(m) puts m end end end