require 'eventmachine' require 'msgpack' module Ganymed module Client ## # An EventMachine Protocol that can send events to a Ganymed processor. # module Processor include EM::Deferrable # Emit a new Event. # # @param [String] ns Event namespace. # @param [Object] value Event value. # @param [Hash] opts Options # @option opts [String] cf Consolidation function used in this event. # @option opts [Time] now Event timestamp. # @option opts [Fixnum] resolution Event resolution. # @option opts [String] origin Event origin. def event(ns, value, opts={}) opts = { :cf => nil, :now => Time.now.utc, :resolution => 0, :origin => @origin, }.merge(opts) data = { :_type => :event, :n => ns, :c => opts[:cf], :o => opts[:origin], :t => opts[:now].to_i, :r => opts[:resolution].to_i, :v => value } send_data(data.to_msgpack) end # Emit a metadata object to the Processor. # # A metadata object contains arbitrary data related to the origin. This # can be queried by Websocket clients for displaying information about # origins. # # @param [Hash] data Metadata object. # @param [Hash] opts Options # @option opts [String] origin Metadata origin. def metadata(data, opts={}) opts = { :origin => @origin, }.merge(opts) data = { :_type => :metadata, :origin => opts[:origin], :data => data, } send_data(data.to_msgpack) end # Connect to a Ganymed processor. # # @param [String] host Host to connect to. # @param [Fixnum] port Port to connect to. # @param [String] origin Origin of events. Defaults to the # fully-qualified hostname. def self.connect(host, port, origin=nil) EM.connect(host, port, self, host, port, origin) end # @private def initialize(host, port, origin=nil) @host, @port = host, port @origin = origin || ::Socket.gethostbyname(::Socket.gethostname).first end # @private def connection_completed @reconnecting = false @connected = true succeed end # @private def unbind if @connected or @reconnecting EM.add_timer(1) { reconnect(@host, @port) } @connected = false @reconnecting = true @deferred_status = nil else raise "unable to connect #{@origin} to ganymed processor at #{@host}:#{@port}" end end end end end