Sha256: 3fa2a8365e1cea82c238a2dc6654e9e0f87dc967ed0b7bb7f6f65c0376ce8072
Contents?: true
Size: 1.4 KB
Versions: 4
Compression:
Stored size: 1.4 KB
Contents
# encoding: utf-8 require 'ffi-rzmq' require "logstash/namespace" module LogStash::Util::ZeroMQ # LOGSTASH-400 # see https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L93-117 STRING_OPTS = %w{IDENTITY SUBSCRIBE UNSUBSCRIBE} def context @context ||= ZMQ::Context.new end def setup(socket, address) if server? error_check(socket.bind(address), "binding to #{address}") else error_check(socket.connect(address), "connecting to #{address}") end @logger.info("0mq: #{server? ? 'connected' : 'bound'}", :address => address) end def error_check(rc, doing) unless ZMQ::Util.resultcode_ok?(rc) @logger.error("ZeroMQ error while #{doing}", { :error_code => rc }) raise "ZeroMQ Error while #{doing}" end end # def error_check def setopts(socket, options) options.each do |opt,value| sockopt = opt.split('::')[1] option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) unless STRING_OPTS.include?(sockopt) begin Float(value) value = value.to_i rescue ArgumentError raise "#{sockopt} requires a numeric value. #{value} is not numeric" end end # end unless error_check(socket.setsockopt(option, value), "while setting #{opt} == #{value}") end # end each end # end setopts end # module LogStash::Util::ZeroMQ
Version data entries
4 entries across 4 versions & 1 rubygems