require_relative "request" require_relative "vm" module Immunio class Processor attr_accessor :environment # Holds environment info for next channel transmission def initialize(channel, vmfactory, options) @channel = channel @vmfactory = vmfactory @dev_mode = options.fetch(:dev_mode, false) @debug_mode = options.fetch(:debug_mode, false) @log_timings = options.fetch(:log_timings, false) # This hash is not in sync with the one in the VM. It is sent to the VM on initialization. @serverdata = {} # List of hook handlers. hook => Lua function. # Stored in the request on first execution. @hook_handlers = {} @timings = Hash.new do |timings, type| timings[type] = Hash.new do |type_timings, name| type_timings[name] = { "total_duration" => 0, "count" => 0 } end end @timings_mutex = Mutex.new # Package up aggregated timings to send to backend @channel.on_sending do @timings_mutex.synchronize do Immunio.logger.debug {"Aggregated timings since last agentmanager transmission: #{@timings}"} timings = @timings.clone @timings.clear { timings: timings }.tap do |info| next unless @environment Immunio.logger.debug {"Reporting environment info: #{@environment}"} info[:environment] = @environment @environment = nil end end end puts "[IMMUNIO] Dev mode activated!" if @dev_mode end def new_request(request) Immunio.logger.debug { "New request: (started: #{@channel.started?})" } # Reset channel if it was created by parent process @channel.reset if @channel.needs_reset? # Start channel on first request @channel.start unless @channel.started? # Wait until we've received all the hooks before continuing (if # ready_timeout is set) @channel.wait_until_ready! ActiveSupport::Notifications.publish "immunio.new_request", request # Don't process request unless channel is ready (meaning we've loaded all # the hooks) or we're in dev_mode and hooks are loaded from files Request.current = request if (@channel.ready? || @dev_mode) end def aggregate_timings(timings) log_pieces = [] log_pieces << "\nTimings for request (in ms):" if @log_timings request_total = timings["request"]["total"][:total_duration] log_pieces << "\tTotal request time: #{request_total}" if @log_timings @timings_mutex.synchronize do timings.each do |type, type_timings| log_pieces << "\tType: #{type}" if @log_timings && type != "request" type_total = 0 type_timings.each do |name, timing| if @log_timings && type != "request" log_pieces << "\t\t#{name}: #{timing[:total_duration]} (#{timing[:count]})" end @timings[type][name]["total_duration"] += timing[:total_duration] @timings[type][name]["count"] += timing[:count] type_total += timing[:total_duration] end if @log_timings && type != "request" log_pieces << "\tTotal time for type #{type}: #{type_total.round(3)}/#{request_total}" end end end Immunio.logger.info { log_pieces.join("\n") } if @log_timings end def finish_request request = Request.current if request Immunio.logger.debug { "Finishing request #{request.id}" } aggregate_timings(request.timings) ActiveSupport::Notifications.publish "immunio.finish_request", request @channel.send_encoded_message request.encode if request.should_report? end rescue StandardError => e log_and_send_error e, "Error finishing request", request_id: request.try(:id) ensure Request.current = nil end # Some versions of rails, like 4.2.0, fail to make JSONifiable model # instances because they have bugs where superfluous sub-objects of # unserializable classes, like IO, are added. Filter non-array, non-hash, # non-primitive values from data to be JSONified. def make_safe_data(data) if data.is_a?(Hash) data.each_with_object({}) { |(key, value), obj| obj[key] = make_safe_data(value) } elsif data.is_a?(Array) data.map(&method(:make_safe_data)) elsif [Numeric, String, TrueClass, FalseClass].any? { |c| data.is_a? c } data else data.inspect end end # Run the `hook` and return a hash eg.: `{ "allow": true }`. def run_hook(plugin, hook, meta={}) request = Request.current # Hooks called outside of a request are ignored since they are triggered while the framework is loaded. return {} unless request # Notify about the hook. This has no perf cost if there are no subscribers. # Used to test and debug the agent in the test Rails apps. ActiveSupport::Notifications.publish "immunio.hook", plugin, hook, meta timestamp = Time.now.utc.iso8601(6) # The VM & handlers are changed on code update. # So we ensure the request uses the same VM & hook handlers for all hooks. request.vm ||= @vmfactory.new_vm # If there is no registered handler, just log the hook and return. unless request.vm.has_function? hook Immunio.logger.debug { "No hook code for '#{hook}' to run for request #{request.id}" } return {} end # Converts the request data to a Lua table to speedup future calls. request.data = request.vm.create_object(request.data) globals = { "agent_type" => AGENT_TYPE, "agent_version" => VERSION, "timestamp" => timestamp, "plugin" => plugin, "hook" => hook, "meta" => meta, "request" => request.data, } begin Immunio.logger.debug { "Running #{hook} hook for request #{request.id} with global values: #{globals}" } rescue Encoding::CompatibilityError Immunio.logger.debug { "Running #{hook} hook for request #{request.id} (can't log global values due to encoding incompatibility)" } end # Run the hook code in the VM and time the execution. result = Request.time "hook", hook do request.vm.call hook, globals end # result.to_h can be expensive, so put it in a block so it only runs when needed begin Immunio.logger.debug { "Result from #{hook} hook: #{result ? result.to_h : {}}" } rescue Encoding::CompatibilityError Immunio.logger.debug { "Result from #{hook} hook: (can't log result due to encoding incompatibility)" } end result ||= {} if result.respond_to?(:has_key?) and result.has_key?(:diagnostics) result[:diagnostics].to_h.each_value do |diag| Immunio.logger.debug { "Sending Diagnostic Report: #{diag['message']}" } msg = { type: "engine.diagnostic", diagnostic_type: diag['report_type'], diagnostic_message: diag['message'], diagnostic_meta: diag['meta'], diagnostic_version: "0.0.2", agent_version: Immunio::VERSION, request_id: request.id, timestamp: timestamp, plugin: plugin, hook: hook, meta: make_safe_data(meta), vmcode_version: request.vm.code_version, vmdata_version: request.vm.data_version } @channel.send_message msg end end result # Previosuly this only caught VMErrors, however other exceptions can cause 500s # so to be on the safe side make sure we catch anything raised within the VM call --ol rescue StandardError => e # Log and discard VM errors log_and_send_error e, "Error running hook #{hook}", request_id: request.id, timestamp: timestamp, plugin: plugin, hook: hook, meta: make_safe_data(meta), vmcode_version: request.vm.code_version, vmdata_version: request.vm.data_version {} # Return empty result. end # Run the hook and raise a RequestBlocked error if the request should be blocked. def run_hook!(*args) result = run_hook(*args) # Raise if not allowed (default to allow) if !result.fetch("allow", true) Immunio.logger.debug { "Blocking request due to hook response" } raise RequestBlocked, "The request was blocked by the Immunio agent" end # Check result for a response override. if result.has_key?(:override_status) raise OverrideResponse.new(result.fetch(:override_status), result.fetch(:override_headers, []), result.fetch(:override_body, "")) end result end def log_and_send_error(e, message="Error", info={}) Immunio.logger.warn { "#{message}: #{e.message}" } Immunio.logger.warn { "Stack: #{e.backtrace}" } # Re-raise in dev mode before we send it to the backend. raise e if @dev_mode default_info = { type: "engine.exception", exception: e.message, traceback: e.backtrace, agent_version: Immunio::VERSION } @channel.send_message default_info.merge(info) # Re-raise error in test mode so we know when something is broken in hook handlers. raise e if Rails.env.test? end end end