lib/logstash/inputs/zeromq.rb in logstash-input-zeromq-1.0.0 vs lib/logstash/inputs/zeromq.rb in logstash-input-zeromq-2.0.0
- old
+ new
@@ -111,27 +111,30 @@
end
end
end # def register
- def teardown
+ def close
error_check(@zsocket.close, "while closing the zmq socket")
- end # def teardown
+ context.terminate
+ end # def close
def server?
@mode == "server"
end # def server?
def run(output_queue)
host = Socket.gethostname
begin
- loop do
+ while !stop?
# Here's the unified receiver
# Get the first part as the msg
m1 = ""
- rc = @zsocket.recv_string(m1)
+ rc = @zsocket.recv_string(m1, ZMQ::DONTWAIT)
+ next if rc == -1 && ZMQ::Util.errno == ZMQ::EAGAIN
error_check(rc, "in recv_string")
+
@logger.debug("ZMQ receiving", :event => m1)
msg = m1
# If we have more parts, we'll eat the first as the topic
# and set the message to the second part
if @zsocket.more_parts?
@@ -140,20 +143,16 @@
rc2 = @zsocket.recv_string(m2)
error_check(rc2, "in recv_string")
@logger.debug("ZMQ receiving", :event => m2)
msg = m2
end
-
@codec.decode(msg) do |event|
event["host"] ||= host
decorate(event)
output_queue << event
end
end
- rescue LogStash::ShutdownSignal
- # shutdown
- return
rescue => e
@logger.debug("ZMQ Error", :subscriber => @zsocket,
:exception => e)
retry
end # begin
@@ -161,6 +160,7 @@
private
def build_source_string
id = @address.first.clone
end
+
end # class LogStash::Inputs::ZeroMQ