require 'socket' require 'msgpack' require 'flydata/command_loggable' module Flydata module Output class ForwarderFactory include CommandLoggable def self.create(forwarder_key, tag, servers, options = {}) case forwarder_key when nil, "tcpforwarder" puts("Creating TCP connection") if FLYDATA_DEBUG forward = TcpForwarder.new(tag, servers, options) when "sslforwarder" puts("Creating SSL connection") if FLYDATA_DEBUG forward = SslForwarder.new(tag, servers, options) else raise "Unsupported Forwarding type #{forwarder_key}" end forward end end class TcpForwarder include CommandLoggable FORWARD_HEADER = [0x92].pack('C') BUFFER_SIZE = 1024 * 1024 * 32 # 32M DEFUALT_SEND_TIMEOUT = 60 # 1 minute RETRY_INTERVAL = 2 RETRY_LIMIT = 10 def initialize(tag, servers, options = {}) @tag = tag unless servers and servers.kind_of?(Array) and not servers.empty? raise "Servers must not be empty." end @servers = servers @server_index = 0 set_options(options) reset end def set_options(options) if options[:buffer_size_limit] @buffer_size_limit = options[:buffer_size_limit] else @buffer_size_limit = BUFFER_SIZE end end attr_reader :buffer_record_count, :buffer_size def emit(records, time = Time.now.to_i) records = [records] unless records.kind_of?(Array) records.each do |record| event_data = [time,record].to_msgpack @buffer_records << event_data @buffer_record_count += 1 @buffer_size += event_data.bytesize end if @buffer_size > @buffer_size_limit send else false end end #TODO retry logic def send if @buffer_size > 0 else return false end if ENV['FLYDATA_BENCHMARK'] reset return true end sock = nil retry_count = 0 begin sock = connect(pickup_server) # Write header sock.write FORWARD_HEADER # Write tag sock.write @tag.to_msgpack # Write records sock.write [0xdb, @buffer_records.bytesize].pack('CN') StringIO.open(@buffer_records) do |i| FileUtils.copy_stream(i, sock) end rescue => e retry_count += 1 if retry_count > RETRY_LIMIT log_error_stderr("! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}") raise e end log_warn_stderr("! Warn: Retrying to send data. retry_count:#{retry_count} error=#{e.to_s}") wait_time = RETRY_INTERVAL ** retry_count log_warn_stderr(" Now waiting for next retry. time=#{wait_time}sec") sleep wait_time retry ensure if sock sock.close rescue nil end end reset true end #TODO: Check server status def pickup_server ret_server = @servers[@server_index] @server_index += 1 if @server_index >= (@servers.count) @server_index = 0 end ret_server end def connect(server) host, port = server.split(':') sock = TCPSocket.new(host, port.to_i) # Set options opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!') sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!') sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) sock end def reset @buffer_records = '' @buffer_record_count = 0 @buffer_size = 0 end def flush send end def close flush end end class SslForwarder < TcpForwarder def connect(server) tcp_sock = super ssl_ctx = ssl_ctx_with_verification ssl_sock = OpenSSL::SSL::SSLSocket.new(tcp_sock, ssl_ctx) ssl_sock.sync_close = true ssl_sock.connect ssl_sock end private def ssl_ctx_with_verification cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths ssl_ctx = OpenSSL::SSL::SSLContext.new ssl_ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER ssl_ctx.cert_store = cert_store ssl_ctx end end end end