require "thread" require "faraday" require "faraday_middleware" require "msgpack" require "base64" require_relative "errors" require_relative "logger" require_relative "version" module Immunio # Communication channel with the Immunio webservice. class Channel DIGEST = OpenSSL::Digest.new('sha1') attr_reader :success_count, :error_count, :message_queue attr_reader :rejected_message_count def initialize(config) @config = config @agent_uuid = nil # Messages waiting to be sent. @message_queue = Queue.new # Messages that were sent but failed. Need to be resent. @send_buffer = [] @send_buffer_bytes = 0 @last_report_time = 0 # A large message we may have popped from the queue but couldn't fit # in the current report @next_message = nil @send_seq = 0 @dropped_message_count = 0 @rejected_message_count = 0 @success_count = 0 @error_count = 0 @quick_connect = true @started = false @ready = false @callbacks = [] # Anything looking to add to the messages sent to the server: @senders = [] end def ready? @ready end def set_ready @ready = true end def started? @started end def messages_count @message_queue.size end def start return if @started @started = true @thread = Thread.new { run } end # Stop and wait for the last messages to be sent. def stop return unless @started Immunio.logger.debug { "Stopping channel" } @started = false @ready = false if @thread @thread.kill @thread.join end end def send_message(message) send_encoded_message message.to_msgpack end def send_encoded_message(message) if @message_queue.size > @config.max_send_queue_size Immunio.logger.warn { "Dropping message for agent manager due to queue overflow (#{@message_queue.size} > #{@config.max_send_queue_size})" } Immunio.logger.debug { "Dropped message: (#{message})" } # No room for this message on the queue. Discard. @dropped_message_count += 1 return end Immunio.logger.debug {"Sending message to backend: #{MessagePack.unpack(message)}"} @message_queue << message end def on_message(&block) @callbacks << block end def on_sending(&block) @senders << block end # Wait until we receive a message from the agentmanager. # This is used primarily for internal testing to wait until all the hooks # are loaded. def wait_until_ready! return if @ready if @config.ready_timeout.to_i <= 0 return end Immunio.logger.debug { "Channel waiting #{@config.ready_timeout.to_i} seconds until ready..." } Timeout.timeout @config.ready_timeout.to_i do # Wait until we get a response from the agentmanager sleep 0.1 until ready? Immunio.logger.debug { "Channel ready!" } end end protected def setup_connection(faraday) # Override to modify Faraday options & middlewares faraday.adapter Faraday.default_adapter end private # Core method running in a thread def run Immunio.logger.debug { "Starting channel on thread #{Thread.current.object_id}" } # Create an empty cert_store to prevent Faraday from using the system default OpenSSL store. cert_store = OpenSSL::X509::Store.new # Setup the connection for making requests to the server. @connection = Faraday::Connection.new(@config.hello_url, ssl: { ca_file: "#{Immunio::DIR}/immunio/immunio_ca.crt", cert_store: cert_store }, request: { timeout: @config.http_timeout }) do |faraday| faraday.request :url_encoded # Provide a hook for additional Faraday config. setup_connection faraday end # Get the polling URL from the server. hello # Start sending messages while @started poll @success_count += 1 @error_count = 0 end rescue StandardError => e # Retry forever on error @error_count += 1 log_error(e) @success_count = 0 @quick_connect = true exponential_backoff retry end def log_error(e) if @error_count == 1 Immunio.logger.warn { "Connection failed after #{@success_count} successes: #{e} (#{e.class})" } else Immunio.logger.warn { "Connection failure [#{@error_count}]: #{e} (#{e.class})" } end Immunio.logger.debug { e.backtrace.join("\n") } end def exponential_backoff() # Exponential backoff delay_ms = @config.initial_delay_ms * (2 ** (@error_count - 1)) # Cap at max_delay_ms delay_ms = [delay_ms, @config.max_delay_ms].min # choose a random delay less than the computed delay. # The randomness avoids a herd effect from transient failures. delay_ms *= rand delay_ms = delay_ms.round Immunio.logger.info { "Delaying #{delay_ms} ms before next request" } sleep delay_ms / 1000.0 end def notify(message) message = message.symbolize_keys! @callbacks.each { |callback| callback.call(message) } end def raw_log(raw) raw.encode 'utf-8', invalid: :replace, undef: :replace end # Send initial hello request. def hello response = @connection.get do |req| req.url "/" req.params['name'] = AGENT_TYPE req.params['version'] = VERSION req.params['key'] = @config[:key] req.headers['Accept'] = 'application/x-msgpack' Immunio.logger.trace {"Sending hello request to agent manager (#{req})"} end if response.status != 200 then raise Error, "Bad response from Immunio server: #{response.status} #{raw_log response.body}" end Immunio.logger.trace {"Received hello response from agent manager (status: #{response.status}, body: #{raw_log response.body})"} body = MessagePack.unpack(response.body) @polling_url = body["url"] if @polling_url.blank? then raise Error, "No URL in HELLO response: #{response.body}" end Immunio.logger.info { "Agent connected to #{@config.hello_url}" } end # Execute a block for at max a given time to match `@config.max_report_interval`. def with_report_timeout time_since_last_report = Time.now.to_i - @last_report_time # Interval expired already return if time_since_last_report >= @config.max_report_interval # Compute time to wait for new messages timeout = @config.max_report_interval - time_since_last_report Timeout.timeout timeout do yield end rescue Timeout::Error # rubocop:disable Lint/HandleExceptions # Timeout expired end def send_buffer_has_room(extra_bytes) used_bytes = @send_buffer_bytes + extra_bytes + @next_message.bytesize return @send_buffer.size < @config.max_report_size && used_bytes < @config.max_report_bytes end def add_to_send_buffer(message) @send_buffer_bytes += message.bytesize @send_buffer << message end # Fill send_buffer with messages to send def collect_messages(used_bytes) # if we had a message leftover from last send, make sure its not # so big that it would fill the whole buffer if @next_message != nil if send_buffer_has_room used_bytes add_to_send_buffer @next_message else Immunio.logger.warn { "Dropped message over max byte send size, next message size #{@next_message.bytesize}" } Immunio.logger.debug { "Dropped next message used: #{used_bytes} over max byte: #{@next_message}" } @dropped_message_count += 1 end @next_message = nil end # Empty the queue as much as possible. while !@message_queue.empty? @next_message = @message_queue.pop if !send_buffer_has_room used_bytes break end add_to_send_buffer @next_message @next_message = nil end # Return immediately if we've got the minimum number of messages to # report or we're still waiting for an agent.ready message, # or if we're at the maximum buffer size return @send_buffer if @send_buffer.size >= @config.min_report_size || @next_message != nil # Wait for messages from the queue until we have enough or get a timeout. with_report_timeout do while @send_buffer.size < @config.min_report_size # If there are no messages in the queue, this will block until one arrives. @next_message = @message_queue.pop if !send_buffer_has_room used_bytes break end add_to_send_buffer @next_message @next_message = nil end end @send_buffer end def gzip(s) gzip_io = StringIO.new begin gzip = Zlib::GzipWriter.new(gzip_io) gzip.write s ret = gzip_io.string ensure gzip.close end ret end # Poll the server sending queued messages at the same time. def poll # Prep data body = { send_seq: @send_seq, dropped_message_count: @dropped_message_count, rejected_message_count: @rejected_message_count, name: AGENT_TYPE, version: VERSION, vm_version: VM_VERSION, } body[:agent_uuid] = @agent_uuid if @agent_uuid # Add any values that senders would like to send: @senders.each { |callback| body.merge! callback.call() } # Encode the body of the request using msgpack streaming API. # See http://ruby.msgpack.org/MessagePack/Packer.html#write_map_header-instance_method for other methods. packer = MessagePack::Packer.new # Pack the `body` hash. packer.write_map_header body.size + 1 # body's + 'msgs' body.each_pair { |key, value| packer.write(key).write(value) } packer.write 'msgs' # The key if not @quick_connect # Append the prepacked messages. collect_messages packer.size packer.write_array_header @send_buffer.size @send_buffer.each { |message| packer.buffer << message } else packer.write_array_header 0 @quick_connect = false end encoded_body = packer.to_s # Send the request response = @connection.post do |req| req.url @polling_url req.params['name'] = AGENT_TYPE req.params['version'] = VERSION req.params['key'] = @config[:key] req.params['sig'] = OpenSSL::HMAC.hexdigest(DIGEST, @config.secret, encoded_body) req.headers['Content-Type'] = 'application/x-msgpack' req.headers['Content-Encoding'] = 'gzip' req.headers['Accept'] = 'application/x-msgpack' req.body = gzip(encoded_body) Immunio.logger.trace {"Sending request to agent manager (data: #{MessagePack.unpack(encoded_body)}, request: #{req})"} end if response.status >= 400 and response.status < 500 then # 4XX response codes should NOT be retried. Discard the report. @rejected_message_count += @send_buffer.size Immunio.logger.trace { "Rejecting #{@send_buffer.size} messages" } @send_buffer = [] @send_buffer_bytes = 0 Immunio.logger.trace {"Received response from agent manager (status: #{response.status}, raw body: #{raw_log response.body})"} raise Error, "Bad response from Immunio server: #{response.status} #{response.body}" end if response.status >= 500 then # 5XX response codes are treated like errors. Immunio.logger.trace {"Received response from agent manager (status: #{response.status}, raw body: #{raw_log response.body})"} raise Error, "Bad response from Immunio server: #{response.status} #{response.body}" end body = MessagePack.unpack(response.body) Immunio.logger.trace {"Received response from agent manager (status: #{response.status}, body: #{body}, raw body: #{raw_log response.body})"} # Update local data from response new_agent_uuid = body["agent_uuid"] if new_agent_uuid Immunio.logger.info { "Agent UUID: #{new_agent_uuid}" } if new_agent_uuid != @agent_uuid @agent_uuid = new_agent_uuid end @send_seq += @send_buffer.size # If messages were delivered successfully, clear send buffer. if response.success? @send_buffer = [] @send_buffer_bytes = 0 end @last_report_time = Time.now.to_i received_messages = body["msgs"] if received_messages received_messages.each { |message| notify message } end end end end