Sha256: 3258c7631b43214798b39cebc20ae95ae3cde62936839bcc2b89d99b18132972

Contents?: true

Size: 1.41 KB

Versions: 27

Compression:

Stored size: 1.41 KB

Contents

# encoding: utf-8
require 'ffi-rzmq'
require "logstash/namespace"

module LogStash::Util::ZeroMQ
  CONTEXT = ZMQ::Context.new
  # LOGSTASH-400
  # see https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L93-117
  STRING_OPTS = %w{IDENTITY SUBSCRIBE UNSUBSCRIBE}

  def context
    CONTEXT
  end

  def setup(socket, address)
    if server?
      error_check(socket.bind(address), "binding to #{address}")
    else
      error_check(socket.connect(address), "connecting to #{address}")
    end
    @logger.info("0mq: #{server? ? 'connected' : 'bound'}", :address => address)
  end

  def error_check(rc, doing)
    unless ZMQ::Util.resultcode_ok?(rc)
      @logger.error("ZeroMQ error while #{doing}", { :error_code => rc })
      raise "ZeroMQ Error while #{doing}"
    end
  end # def error_check

  def setopts(socket, options)
    options.each do |opt,value|
      sockopt = opt.split('::')[1]
      option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt)
      unless STRING_OPTS.include?(sockopt)
        begin
          Float(value)
          value = value.to_i
        rescue ArgumentError
          raise "#{sockopt} requires a numeric value. #{value} is not numeric"
        end
      end # end unless
      error_check(socket.setsockopt(option, value),
              "while setting #{opt} == #{value}")
    end # end each
  end # end setopts
end # module LogStash::Util::ZeroMQ

Version data entries

27 entries across 27 versions & 4 rubygems

Version Path
logstash-filter-zeromq-0.1.3 lib/logstash/util/zeromq.rb
logstash-output-zeromq-0.1.2 lib/logstash/util/zeromq.rb
logstash-output-zeromq-0.1.1 lib/logstash/util/zeromq.rb
logstash-input-zeromq-0.1.1 lib/logstash/util/zeromq.rb
logstash-output-zeromq-0.1.0 lib/logstash/util/zeromq.rb
logstash-input-zeromq-0.1.0 lib/logstash/util/zeromq.rb
logstash-lib-1.3.2 lib/logstash/util/zeromq.rb