lib/fluent/plugin/out_forward.rb in fluentd-0.14.24 vs lib/fluent/plugin/out_forward.rb in fluentd-0.14.25
- old
+ new
@@ -431,20 +431,20 @@
# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
- rollback_write(info.chunk_id)
+ rollback_write(info.chunk_id, update_retry: false)
return nil
else
unpacker.feed(raw_data)
res = unpacker.read
log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
if res['ack'] != info.chunk_id_base64
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
- rollback_write(info.chunk_id)
+ rollback_write(info.chunk_id, update_retry: false)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
end
return info.chunk_id
@@ -481,10 +481,10 @@
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
- rollback_write(info.chunk_id)
+ rollback_write(info.chunk_id, update_retry: false)
else
sockets << info.sock
new_list << info
end
end