lib/fluent/plugin/xray.rb in fluent-plugin-jfrog-siem-2.0.6 vs lib/fluent/plugin/xray.rb in fluent-plugin-jfrog-siem-2.0.7

- old
+ new

@@ -1,195 +1,199 @@ -require 'concurrent' -require 'concurrent-edge' -require 'json' -require "fluent/plugin/position_file" - -class Xray - def initialize(jpd_url, username, api_key, token, wait_interval, batch_size, pos_file_path, router, tag) - @jpd_url = jpd_url - @username = username - @api_key = api_key - @token = token - @wait_interval = wait_interval - @batch_size = batch_size - @pos_file_path = pos_file_path - @router = router - @tag = tag - end - - def violations(date_since) - violations_channel = Concurrent::Channel.new(capacity: @batch_size) - page_number = 1 - timer_task = Concurrent::TimerTask.new(execution_interval: @wait_interval, timeout_interval: 30) do - xray_json = {"filters": { "created_from": date_since }, "pagination": {"order_by": "created","limit": @batch_size ,"offset": page_number } } - puts "Fetching Xray Violations with #{xray_json} parameters" - resp = get_violations(xray_json) - page_violation_count = resp['violations'].length - puts "Total violations count is #{resp['total_violations']}" - if resp['total_violations'] > 0 - puts "Number of Violations in page #{page_number} are #{page_violation_count}" - resp['violations'].each {|v| violations_channel = process(v, violations_channel) } - page_number += 1 if next_page?(page_violation_count) - end - end - timer_task.execute - - violations_channel - end - - def violation_details(violations_channel) - violations_channel.each do |v| - Concurrent::Promises.future(v) do |v| - process_violation_details(v['violation_details_url']) - pos_file = PositionFile.new(@pos_file_path) - puts "Adding issue #{v['issue_id']} to position file at #{@pos_file_path}" - pos_file.write(v) - end - end - end - - def process_violation_details(xray_violation_detail_url) - begin - detailResp_json = data_normalization(get_violations_detail(xray_violation_detail_url)) - time = Fluent::Engine.now - puts "Emitting normalized Xray Violation #{detailResp_json['issue_id']}" - @router.emit(@tag, time, detailResp_json) - rescue => e - puts "Process Violation details error: #{e}" - raise Fluent::ConfigError, "Process Violation details error: #{e}" - end - end - - def get_violations_detail(xray_violation_detail_url) - if !@token.nil? && @token != '' - response = RestClient::Request.new( - :method => :get, - :url => @jpd_url + xray_violation_detail_url[xray_violation_detail_url.index('/xray/'),xray_violation_detail_url.length], - :headers => { :accept => :json, :content_type => :json, Authorization:'Bearer ' + @token } - ) - elsif !@api_key.nil? && @api_key != '' - response = RestClient::Request.new( - :method => :get, - :url => @jpd_url + xray_violation_detail_url[xray_violation_detail_url.index('/xray/'),xray_violation_detail_url.length], - :user => @username, - :password => @api_key - ) - end - - response.execute do |response, request, result| - case response.code - when 200 - return JSON.parse(response.to_s) - else - puts "Validation failed error (cannot reach Artifactory to pull Xray Violation details): #{response.to_json}" - raise Fluent::ConfigError, "Validation failed error (cannot reach Artifactory to pull Xray Violation details): #{response.to_json}" - end - end - end - - def data_normalization(detailResp_json) - cve = [] - cvss_v2_list = [] - cvss_v3_list = [] - policy_list = [] - rule_list = [] - impacted_artifact_url_list = [] - if detailResp_json.key?('properties') - properties = detailResp_json['properties'] - for index in 0..properties.length-1 do - if properties[index].key?('cve') - cve.push(properties[index]['cve']) - end - if properties[index].key?('cvss_v2') - cvss_v2_list.push(properties[index]['cvss_v2']) - end - if properties[index].key?('cvss_v3') - cvss_v3_list.push(properties[index]['cvss_v3']) - end - end - - detailResp_json["cve"] = cve.sort.reverse[0] - cvss_v2 = cvss_v2_list.sort.reverse[0] - cvss_v3 = cvss_v3_list.sort.reverse[0] - if !cvss_v3.nil? - cvss = cvss_v3 - elsif !cvss_v2.nil? - cvss = cvss_v2 - end - cvss_score = cvss[0..2] - cvss_version = cvss.split(':')[1][0..2] - detailResp_json["cvss_score"] = cvss_score - detailResp_json["cvss_version"] = cvss_version - end - - if detailResp_json.key?('matched_policies') - matched_policies = detailResp_json['matched_policies'] - for index in 0..matched_policies.length-1 do - if matched_policies[index].key?('policy') - policy_list.push(matched_policies[index]['policy']) - end - if matched_policies[index].key?('rule') - rule_list.push(matched_policies[index]['rule']) - end - end - detailResp_json['policies'] = policy_list - detailResp_json['rules'] = rule_list - end - - detailResp_json['impacted_artifacts'].each do |impacted_artifact| - matchdata = impacted_artifact.match /default\/(?<repo_name>[^\/]*)\/(?<path>.*)/ - impacted_artifact_url = matchdata['repo_name'] + ":" + matchdata['path'] + " " - impacted_artifact_url_list.append(impacted_artifact_url) - end - detailResp_json['impacted_artifacts_url'] = impacted_artifact_url_list - return detailResp_json - end - - def process(violation, violations_channel) - pos_file = PositionFile.new(@pos_file_path) - unless pos_file.processed?(violation) - violations_channel << violation - else - puts "Violation #{violation['issue_id']} is already processed" - end - #violations_channel << violation unless pos_file.processed?(violation) - violations_channel - end - - private - def get_violations(xray_json) - if !@token.nil? && @token != '' - puts "Validating JPD access token and fetching violations" - response = RestClient::Request.new( - :method => :post, - :url => @jpd_url + "/xray/api/v1/violations", - :payload => xray_json.to_json, - :headers => { :accept => :json, :content_type => :json, Authorization:'Bearer ' + @token } - ) - elsif !@api_key.nil? && @api_key != '' - puts "Validating JPD API Key and fetching violations" - response = RestClient::Request.new( - :method => :post, - :url => @jpd_url + "/xray/api/v1/violations", - :payload => xray_json.to_json, - :user => @username, - :password => @api_key, - :headers => { :accept => :json, :content_type => :json } - ) - end - response.execute do |response, request, result| - case response.code - when 200 - return JSON.parse(response.to_str) - else - puts "Validation failed error (cannot reach Artifactory to pull Xray Violations): #{response.to_json}" - raise Fluent::ConfigError, "Validation failed error (cannot reach Artifactory to pull Xray Violations): #{response.to_json}" - end - end - end - - def next_page?(count) - count == @batch_size - end - -end - +require 'concurrent' +require 'concurrent-edge' +require 'json' +require "fluent/plugin/position_file" + +class Xray + def initialize(jpd_url, username, api_key, token, wait_interval, batch_size, pos_file_path, router, tag) + @jpd_url = jpd_url + @username = username + @api_key = api_key + @token = token + @wait_interval = wait_interval + @batch_size = batch_size + @pos_file_path = pos_file_path + @router = router + @tag = tag + end + + def violations(date_since) + violations_channel = Concurrent::Channel.new(capacity: @batch_size) + page_number = 1 + timer_task = Concurrent::TimerTask.new(execution_interval: @wait_interval, timeout_interval: 30) do + xray_json = {"filters": { "created_from": date_since }, "pagination": {"order_by": "created","limit": @batch_size ,"offset": page_number } } + puts "Fetching Xray Violations with #{xray_json} parameters" + resp = get_violations(xray_json) + page_violation_count = resp['violations'].length + puts "Total violations count is #{resp['total_violations']}" + if resp['total_violations'] > 0 + puts "Number of Violations in page #{page_number} are #{page_violation_count}" + resp['violations'].each {|v| violations_channel = process(v, violations_channel) } + page_number += 1 if next_page?(page_violation_count) + end + end + timer_task.execute + + violations_channel + end + + def violation_details(violations_channel) + violations_channel.each do |v| + Concurrent::Promises.future(v) do |v| + process_violation_details(v['violation_details_url']) + pos_file = PositionFile.new(@pos_file_path) + puts "Adding issue #{v['issue_id']} to position file at #{@pos_file_path}" + pos_file.write(v) + end + end + end + + def process_violation_details(xray_violation_detail_url) + begin + detailResp_json = data_normalization(get_violations_detail(xray_violation_detail_url)) + time = Fluent::Engine.now + puts "Emitting normalized Xray Violation #{detailResp_json['issue_id']}" + @router.emit(@tag, time, detailResp_json) + rescue => e + puts "Process Violation details error: #{e}" + raise Fluent::ConfigError, "Process Violation details error: #{e}" + end + end + + def get_violations_detail(xray_violation_detail_url) + if !@token.nil? && @token != '' + response = RestClient::Request.new( + :method => :get, + :url => @jpd_url + xray_violation_detail_url[xray_violation_detail_url.index('/xray/'),xray_violation_detail_url.length], + :headers => { :accept => :json, :content_type => :json, Authorization:'Bearer ' + @token } + ) + elsif !@api_key.nil? && @api_key != '' + response = RestClient::Request.new( + :method => :get, + :url => @jpd_url + xray_violation_detail_url[xray_violation_detail_url.index('/xray/'),xray_violation_detail_url.length], + :user => @username, + :password => @api_key + ) + end + + response.execute do |response, request, result| + case response.code + when 200 + return JSON.parse(response.to_s) + else + puts "Validation failed error (cannot reach Artifactory to pull Xray Violation details): #{response.to_json}" + raise Fluent::ConfigError, "Validation failed error (cannot reach Artifactory to pull Xray Violation details): #{response.to_json}" + end + end + end + + def data_normalization(detailResp_json) + cve = [] + cvss_v2_list = [] + cvss_v3_list = [] + policy_list = [] + rule_list = [] + impacted_artifact_url_list = [] + if detailResp_json.key?('properties') + properties = detailResp_json['properties'] + for index in 0..properties.length-1 do + if properties[index].key?('cve') + cve.push(properties[index]['cve']) + end + if properties[index].key?('cvss_v2') + cvss_v2_list.push(properties[index]['cvss_v2']) + end + if properties[index].key?('cvss_v3') + cvss_v3_list.push(properties[index]['cvss_v3']) + end + end + + detailResp_json["cve"] = cve.sort.reverse[0] + cvss_v2 = cvss_v2_list.sort.reverse[0] + cvss_v3 = cvss_v3_list.sort.reverse[0] + if !cvss_v3.nil? + cvss = cvss_v3 + elsif !cvss_v2.nil? + cvss = cvss_v2 + end + cvss_score = cvss[0..2] + cvss_version = cvss.split(':')[1][0..2] + detailResp_json["cvss_score"] = cvss_score + detailResp_json["cvss_version"] = cvss_version + end + + if detailResp_json.key?('matched_policies') + matched_policies = detailResp_json['matched_policies'] + for index in 0..matched_policies.length-1 do + if matched_policies[index].key?('policy') + policy_list.push(matched_policies[index]['policy']) + end + if matched_policies[index].key?('rule') + rule_list.push(matched_policies[index]['rule']) + end + end + detailResp_json['policies'] = policy_list + detailResp_json['rules'] = rule_list + end + + detailResp_json['impacted_artifacts'].each do |impacted_artifact| + matchdata = impacted_artifact.match /default\/(?<repo_name>[^\/]*)\/(?<path>.*)/ + if matchdata + impacted_artifact_url = matchdata['repo_name'] + ":" + matchdata['path'] + " " + impacted_artifact_url_list.append(impacted_artifact_url) + else + impacted_artifact_url_list.append(impacted_artifact) + end + end + detailResp_json['impacted_artifacts_url'] = impacted_artifact_url_list + return detailResp_json + end + + def process(violation, violations_channel) + pos_file = PositionFile.new(@pos_file_path) + unless pos_file.processed?(violation) + violations_channel << violation + else + puts "Violation #{violation['issue_id']} is already processed" + end + #violations_channel << violation unless pos_file.processed?(violation) + violations_channel + end + + private + def get_violations(xray_json) + if !@token.nil? && @token != '' + puts "Validating JPD access token and fetching violations" + response = RestClient::Request.new( + :method => :post, + :url => @jpd_url + "/xray/api/v1/violations", + :payload => xray_json.to_json, + :headers => { :accept => :json, :content_type => :json, Authorization:'Bearer ' + @token } + ) + elsif !@api_key.nil? && @api_key != '' + puts "Validating JPD API Key and fetching violations" + response = RestClient::Request.new( + :method => :post, + :url => @jpd_url + "/xray/api/v1/violations", + :payload => xray_json.to_json, + :user => @username, + :password => @api_key, + :headers => { :accept => :json, :content_type => :json } + ) + end + response.execute do |response, request, result| + case response.code + when 200 + return JSON.parse(response.to_str) + else + puts "Validation failed error (cannot reach Artifactory to pull Xray Violations): #{response.to_json}" + raise Fluent::ConfigError, "Validation failed error (cannot reach Artifactory to pull Xray Violations): #{response.to_json}" + end + end + end + + def next_page?(count) + count == @batch_size + end + +end +