Sha256: 3fa2a8365e1cea82c238a2dc6654e9e0f87dc967ed0b7bb7f6f65c0376ce8072

Contents?: true

Size: 1.4 KB

Versions: 4

Compression:

Stored size: 1.4 KB

Contents

# encoding: utf-8
require 'ffi-rzmq'
require "logstash/namespace"

module LogStash::Util::ZeroMQ
  # LOGSTASH-400
  # see https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L93-117
  STRING_OPTS = %w{IDENTITY SUBSCRIBE UNSUBSCRIBE}

  def context
    @context ||= ZMQ::Context.new
  end

  def setup(socket, address)
    if server?
      error_check(socket.bind(address), "binding to #{address}")
    else
      error_check(socket.connect(address), "connecting to #{address}")
    end
    @logger.info("0mq: #{server? ? 'connected' : 'bound'}", :address => address)
  end

  def error_check(rc, doing)
    unless ZMQ::Util.resultcode_ok?(rc)
      @logger.error("ZeroMQ error while #{doing}", { :error_code => rc })
      raise "ZeroMQ Error while #{doing}"
    end
  end # def error_check

  def setopts(socket, options)
    options.each do |opt,value|
      sockopt = opt.split('::')[1]
      option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt)
      unless STRING_OPTS.include?(sockopt)
        begin
          Float(value)
          value = value.to_i
        rescue ArgumentError
          raise "#{sockopt} requires a numeric value. #{value} is not numeric"
        end
      end # end unless
      error_check(socket.setsockopt(option, value),
              "while setting #{opt} == #{value}")
    end # end each
  end # end setopts
end # module LogStash::Util::ZeroMQ

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
logstash-input-zeromq-2.0.4 lib/logstash/util/zeromq.rb
logstash-input-zeromq-2.0.2 lib/logstash/util/zeromq.rb
logstash-input-zeromq-2.0.1 lib/logstash/util/zeromq.rb
logstash-input-zeromq-2.0.0 lib/logstash/util/zeromq.rb