lib/logstash/inputs/zeromq.rb in logstash-input-zeromq-1.0.0 vs lib/logstash/inputs/zeromq.rb in logstash-input-zeromq-2.0.0

- old
+ new

@@ -111,27 +111,30 @@ end end end # def register - def teardown + def close error_check(@zsocket.close, "while closing the zmq socket") - end # def teardown + context.terminate + end # def close def server? @mode == "server" end # def server? def run(output_queue) host = Socket.gethostname begin - loop do + while !stop? # Here's the unified receiver # Get the first part as the msg m1 = "" - rc = @zsocket.recv_string(m1) + rc = @zsocket.recv_string(m1, ZMQ::DONTWAIT) + next if rc == -1 && ZMQ::Util.errno == ZMQ::EAGAIN error_check(rc, "in recv_string") + @logger.debug("ZMQ receiving", :event => m1) msg = m1 # If we have more parts, we'll eat the first as the topic # and set the message to the second part if @zsocket.more_parts? @@ -140,20 +143,16 @@ rc2 = @zsocket.recv_string(m2) error_check(rc2, "in recv_string") @logger.debug("ZMQ receiving", :event => m2) msg = m2 end - @codec.decode(msg) do |event| event["host"] ||= host decorate(event) output_queue << event end end - rescue LogStash::ShutdownSignal - # shutdown - return rescue => e @logger.debug("ZMQ Error", :subscriber => @zsocket, :exception => e) retry end # begin @@ -161,6 +160,7 @@ private def build_source_string id = @address.first.clone end + end # class LogStash::Inputs::ZeroMQ