lib/logstash/outputs/zeromq.rb in logstash-output-zeromq-2.0.4 vs lib/logstash/outputs/zeromq.rb in logstash-output-zeromq-2.1.0

- old
+ new

@@ -24,11 +24,11 @@ # The default logstash topologies work as follows: # # * pushpull - inputs are pull, outputs are push # * pubsub - inputs are subscribers, outputs are publishers - # * pair - inputs are clients, inputs are servers + # * pair - inputs are clients, outputs are servers # # If the predefined topology flows don't work for you, # you can change the 'mode' setting config :topology, :validate => ["pushpull", "pubsub", "pair"], :required => true @@ -59,41 +59,18 @@ # } config :sockopt, :validate => :hash public def register - require "ffi-rzmq" - require "logstash/util/zeromq" - self.class.send(:include, LogStash::Util::ZeroMQ) + load_zmq if @mode == "server" workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!") end - # Translate topology shorthand to socket types - case @topology - when "pair" - zmq_const = ZMQ::PAIR - when "pushpull" - zmq_const = ZMQ::PUSH - when "pubsub" - zmq_const = ZMQ::PUB - end # case socket_type + connect - @zsocket = context.socket(zmq_const) - - error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), - "while setting ZMQ::LINGER == 1)") - - if @sockopt - setopts(@zsocket, @sockopt) - end - - @address.each do |addr| - setup(@zsocket, addr) - end - @codec.on_event(&method(:publish)) end # def register public def close @@ -102,30 +79,57 @@ rescue RuntimeError => e @logger.error("Failed to properly teardown ZeroMQ") end end # def close + def receive(event) + @codec.encode(event) + end + private def server? @mode == "server" end # def server? - public - def receive(event) - - - @codec.encode(event) - end # def receive - def publish(event, payload) - @logger.debug? && @logger.debug("0mq: sending", :event => payload) if @topology == "pubsub" - # TODO(sissel): Need to figure out how to fit this into the codecs system. - #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}") - #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE), - #"in topic send_string") + topic = event.sprintf(@topic) + error_check(@zsocket.send_string(topic, ZMQ::SNDMORE), "in topic send_string") end + @logger.debug? && @logger.debug("0mq: sending", :event => payload) error_check(@zsocket.send_string(payload), "in send_string") rescue => e + warn e.inspect @logger.warn("0mq output exception", :address => @address, :exception => e) + end + + def load_zmq + require "ffi-rzmq" + require "logstash/plugin_mixins/zeromq" + self.class.send(:include, LogStash::PluginMixins::ZeroMQ) + end + + def connect + # Translate topology shorthand to socket types + case @topology + when "pair" + zmq_const = ZMQ::PAIR + when "pushpull" + zmq_const = ZMQ::PUSH + when "pubsub" + zmq_const = ZMQ::PUB + end # case socket_type + + @zsocket = context.socket(zmq_const) + + error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), + "while setting ZMQ::LINGER == 1)") + + if @sockopt + setopts(@zsocket, @sockopt) + end + + @address.each do |addr| + setup(@zsocket, addr) + end end end # class LogStash::Outputs::ZeroMQ