lib/logstash/outputs/zeromq.rb in logstash-output-zeromq-2.0.4 vs lib/logstash/outputs/zeromq.rb in logstash-output-zeromq-2.1.0
- old
+ new
@@ -24,11 +24,11 @@
# The default logstash topologies work as follows:
#
# * pushpull - inputs are pull, outputs are push
# * pubsub - inputs are subscribers, outputs are publishers
- # * pair - inputs are clients, inputs are servers
+ # * pair - inputs are clients, outputs are servers
#
# If the predefined topology flows don't work for you,
# you can change the 'mode' setting
config :topology, :validate => ["pushpull", "pubsub", "pair"], :required => true
@@ -59,41 +59,18 @@
# }
config :sockopt, :validate => :hash
public
def register
- require "ffi-rzmq"
- require "logstash/util/zeromq"
- self.class.send(:include, LogStash::Util::ZeroMQ)
+ load_zmq
if @mode == "server"
workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!")
end
- # Translate topology shorthand to socket types
- case @topology
- when "pair"
- zmq_const = ZMQ::PAIR
- when "pushpull"
- zmq_const = ZMQ::PUSH
- when "pubsub"
- zmq_const = ZMQ::PUB
- end # case socket_type
+ connect
- @zsocket = context.socket(zmq_const)
-
- error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
- "while setting ZMQ::LINGER == 1)")
-
- if @sockopt
- setopts(@zsocket, @sockopt)
- end
-
- @address.each do |addr|
- setup(@zsocket, addr)
- end
-
@codec.on_event(&method(:publish))
end # def register
public
def close
@@ -102,30 +79,57 @@
rescue RuntimeError => e
@logger.error("Failed to properly teardown ZeroMQ")
end
end # def close
+ def receive(event)
+ @codec.encode(event)
+ end
+
private
def server?
@mode == "server"
end # def server?
- public
- def receive(event)
-
-
- @codec.encode(event)
- end # def receive
-
def publish(event, payload)
- @logger.debug? && @logger.debug("0mq: sending", :event => payload)
if @topology == "pubsub"
- # TODO(sissel): Need to figure out how to fit this into the codecs system.
- #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}")
- #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE),
- #"in topic send_string")
+ topic = event.sprintf(@topic)
+ error_check(@zsocket.send_string(topic, ZMQ::SNDMORE), "in topic send_string")
end
+ @logger.debug? && @logger.debug("0mq: sending", :event => payload)
error_check(@zsocket.send_string(payload), "in send_string")
rescue => e
+ warn e.inspect
@logger.warn("0mq output exception", :address => @address, :exception => e)
+ end
+
+ def load_zmq
+ require "ffi-rzmq"
+ require "logstash/plugin_mixins/zeromq"
+ self.class.send(:include, LogStash::PluginMixins::ZeroMQ)
+ end
+
+ def connect
+ # Translate topology shorthand to socket types
+ case @topology
+ when "pair"
+ zmq_const = ZMQ::PAIR
+ when "pushpull"
+ zmq_const = ZMQ::PUSH
+ when "pubsub"
+ zmq_const = ZMQ::PUB
+ end # case socket_type
+
+ @zsocket = context.socket(zmq_const)
+
+ error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
+ "while setting ZMQ::LINGER == 1)")
+
+ if @sockopt
+ setopts(@zsocket, @sockopt)
+ end
+
+ @address.each do |addr|
+ setup(@zsocket, addr)
+ end
end
end # class LogStash::Outputs::ZeroMQ