From 2a133f826faaa7b5211cb288e976a49aa670f583 Mon Sep 17 00:00:00 2001 From: Slavo <133103846+svteb@users.noreply.github.com> Date: Tue, 9 Jul 2024 18:30:10 +0200 Subject: [PATCH] Feat: Rework k8s_tshark functionality and apply the changes in tests (#2097) Refs: #2072 #2087 - Prior functionality was bound to fixed time of execution (120s), which introduced problems in testing (tshark session ending before the test began). - New functionality mainly implements infinite tshark execution along with the possibility of terminating it when deemed appropriate. This is complemented with robust error handling and termination of the tshark process on unexpected crashes during initialization. NOTE: The main tests currently do not handle states where a crash could occur elsewhere and thus a hanging tshark session can still happen (although unlikely). - The module is properly commented which should allow the user to get a quick understanding of its functionality. - The user functionality remains the same with easier-to-comprehend function names. - Handling of PIDs is rather problematic due to the nature of exec_by_node_bg function, which does not return the PID of the tshark process but rather the PID of the shell executing it (unverified). This is why the retrieval of PID may seem rather complicated (especially the pid_command variable). Possible solutions are listed in a comment, but these don't quite work for various reasons (globbing issues, return of incorrect PID, etc.). As for the kill -15 and kill -9 repetition, some tshark session would get stuck in a zombie state if the commands were not executed in this order. Signed-off-by: svteb --- src/tasks/utils/cnf_manager.cr | 6 +- src/tasks/utils/k8s_tshark.cr | 316 +++++++++++++++++------------ src/tasks/utils/oran_monitor.cr | 35 ++-- src/tasks/workload/5g_validator.cr | 70 ++----- 4 files changed, 232 insertions(+), 195 deletions(-) diff --git a/src/tasks/utils/cnf_manager.cr b/src/tasks/utils/cnf_manager.cr index 8eff904a1..5810a1426 100644 --- a/src/tasks/utils/cnf_manager.cr +++ b/src/tasks/utils/cnf_manager.cr @@ -865,8 +865,9 @@ module CNFManager baselines = JaegerManager.unique_services_total Log.info { "baselines: #{baselines}" } end + # todo start tshark monitoring the e2 traffic - tshark_log_name = ORANMonitor.start_e2_capture?(config.cnf_config) + capture = ORANMonitor.start_e2_capture?(config.cnf_config) # todo separate out install methods into a module/function that accepts a block liveness_time = 0 @@ -1061,9 +1062,10 @@ module CNFManager else tracing_used = false end + if ORANMonitor.isCNFaRIC?(config.cnf_config) sleep 30 - e2_found = ORANMonitor.e2_session_established?(tshark_log_name) + e2_found = ORANMonitor.e2_session_established?(capture) else e2_found = false end diff --git a/src/tasks/utils/k8s_tshark.cr b/src/tasks/utils/k8s_tshark.cr index 11da993bd..c9d8a7f60 100644 --- a/src/tasks/utils/k8s_tshark.cr +++ b/src/tasks/utils/k8s_tshark.cr @@ -5,148 +5,212 @@ require "halite" module K8sTshark + class TsharkPacketCapture + property capture_file_path : String + property pid : Int32? + private property node_match : JSON::Any? - def self.log_of_tshark_by_label(command, label_key, label_value, duration="120") : String - Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" } + def initialize + @capture_file_path = "" + end + + def finalize + if @pid + terminate_capture + end + end + + # Method to provide a block context for capture by label. + def self.begin_capture_by_label(label_key : String, label_value : String, command : String = "", &block : TsharkPacketCapture ->) + capture = new + begin + capture.begin_capture_by_label(label_key, label_value, command) + yield capture + ensure + capture.terminate_capture + end + end + + # Method to provide a block context for capture by node. + def self.begin_capture_by_node(node : JSON::Any, command : String = "", &block : TsharkPacketCapture ->) + capture = new + begin + capture.begin_capture_by_node(node, command) + yield capture + ensure + capture.terminate_capture + end + end + + # Starts a tshark packet capture on the node where the pod with the specified label is running. + # label_key and label_value: Used to identify the pod's label. + # command: Parameters to be passed to tshark. + def begin_capture_by_label(label_key : String, label_value : String, command : String = "") + Log.info { "Searching for the pod matching the label '#{label_key}:#{label_value}'."} all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list) - pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value) - first_labeled_pod = pods[0]? - Log.info { "first_labeled_pod: #{first_labeled_pod}" } - if first_labeled_pod && first_labeled_pod.dig?("metadata", "name") - Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" } - pod_name = first_labeled_pod.dig("metadata", "name") - Log.info { "pod_name: #{pod_name}" } - nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod) - node = nodes.first - #create a unique name for the log - # rnd = Random.new - # name_id = rnd.next_int - # tshark_log_name = "/tmp/tshark-#{name_id}.json" - # Log.info { "tshark_log_name #{tshark_log_name}" } - # - # #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - # #command= -ni any -Y nas_5gs.mm.type_id -T json - # #todo check if tshark running already to keep from saturating network - # #todo play with reducing default duration - # ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - # ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node) - # Log.info { "after exec by node bg" } - # resp = tshark_log_name - resp = log_of_tshark_by_node(command, node, duration) + pod_match = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value).first? + + unless pod_match && pod_match.dig?("metadata", "name") + error_message = "Pod with label '#{label_key}:#{label_value}' could not be found." + Log.error { error_message } + raise K8sTsharkError.new(error_message) + end + + pod_name = pod_match.dig("metadata", "name") + Log.info { "Pod '#{pod_name}'' matches the label '#{label_key}:#{label_value}'." } + + Log.info { "Searching for the node running the pod '#{pod_name}'." } + @node_match = KubectlClient::Get.nodes_by_pod(pod_match).first + + unless @node_match && @node_match.not_nil!.dig?("metadata", "name") + error_message = "Node for pod '#{pod_name}' could not be found." + Log.error { error_message } + raise K8sTsharkError.new(error_message) + end + + node_name = node_match.not_nil!.dig("metadata", "name") + Log.info { "Pod '#{pod_name}' is running on node '#{node_name}'." } + + begin_capture_common(command) + end + + # Starts a tshark packet capture on the specified node. + # node: The node where the capture should be performed. + # command: Parameters to be passed to tshark. + # duration: Optional; specifies the capture duration, eliminating the need to call terminate_capture manually. + def begin_capture_by_node(node : JSON::Any, command : String = "") + @node_match = node + begin_capture_common(command) + end + + # Common method to unify capture by label and node. + private def begin_capture_common(command : String) + if @pid + Log.warn { "Ongoing capture process exists, terminate it or create a new capture." } + return + end + + Log.info { "Starting tshark capture with command: #{command}." } + + @capture_file_path = generate_capture_file_path() + Log.info { "Capturing packets on path: #{@capture_file_path}." } + + # Other possible options to resolve the pid conundrum: + # 1. pgrep tshark -f -x "tshark #{command}" + # 2. ...; echo $! > /tmp/pidfile + # 3. bake in 'echo $$!', retrieve from some file + # 4. fix kubectl_client to return the pid of the process that is launched, not the shell + pid_file = "/tmp/pidfile" + pid_command = "ps -eo pid,cmd,start --sort=start_time | grep '[t]shark' | tail -1 | awk '{print $1}' > #{pid_file}" + capture_command = "tshark #{command} > #{@capture_file_path} 2>&1" + + launch_capture(capture_command) + retrieve_pid(pid_command, pid_file) + end + + # Terminates the tshark packet capture process. + def terminate_capture + if @pid + Log.info { "Terminating packet capture with PID: #{@pid}." } + Log.info { "Capture collected on path: #{@capture_file_path}." } + + # Some tshark captures were left in zombie states if only kill/kill -9 was invoked. + ClusterTools.exec_by_node_bg("kill -15 #{@pid}", @node_match.not_nil!) + sleep 1 + ClusterTools.exec_by_node_bg("kill -9 #{@pid}", @node_match.not_nil!) + + @pid = nil + @node_match = nil else - resp = "label key:#{label_key} value: #{label_value} not found" + Log.warn { "No active capture process to terminate." } end - Log.info { "resp #{resp}" } - resp - end + end - def self.log_of_tshark_by_label_bg(command, label_key, label_value, duration="120") : String - Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" } - all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list) - pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value) - first_labeled_pod = pods[0]? - Log.info { "first_labeled_pod: #{first_labeled_pod}" } - if first_labeled_pod && first_labeled_pod.dig?("metadata", "name") - Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" } - pod_name = first_labeled_pod.dig("metadata", "name") - Log.info { "pod_name: #{pod_name}" } - nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod) - node = nodes.first - #create a unique name for the log - # rnd = Random.new - # name_id = rnd.next_int - # tshark_log_name = "/tmp/tshark-#{name_id}.json" - # Log.info { "tshark_log_name #{tshark_log_name}" } - # - # #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - # #command= -ni any -Y nas_5gs.mm.type_id -T json - # #todo check if tshark running already to keep from saturating network - # #todo play with reducing default duration - # ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - # ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node) - # Log.info { "after exec by node bg" } - # resp = tshark_log_name - resp = log_of_tshark_by_node_bg(command, node, duration: "120") - else - resp = "label key:#{label_key} value: #{label_value} not found" + # Searches the capture file for lines matching the specified regex pattern. + # Returns an array of matching lines. + def regex_search(pattern : Regex) : Array(String) + matches = [] of String + + if @capture_file_path.empty? + Log.warn { "Cannot find a match using a regular expression before a capture has been started." } + else + Log.info { "Collecting lines matching the regular expression #{pattern}." } + file_content = File.read(@capture_file_path) + matches = file_content.scan(pattern).map(&.string) + Log.debug { "Printing out matching lines:\n#{matches}" } + end + + matches end - Log.info { "resp #{resp}" } - resp - end + # Checks if any line in the capture file matches the specified regex pattern. + # Returns true if a match is found, otherwise false. + def regex_match?(pattern : Regex) : Bool + if @capture_file_path.empty? + Log.warn { "Cannot find a match using a regular expression before a capture has been started." } + else + Log.info { "Finding a match for regular expression: #{pattern} in file: #{capture_file_path}." } + file_content = File.read(@capture_file_path) + if file_content.scan(pattern).any? + Log.debug { "Match found for regular expression: #{pattern}" } + return true + end + end - def self.log_of_tshark_by_node(command, node, duration="120") : String - Log.info { "log_of_tshark_by_node: command #{command}" } - #create a unique name for the log - rnd = Random.new - name_id = rnd.next_int.abs - tshark_log_name = "/tmp/tshark-#{name_id}.json" - Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" } - - #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - #command= -ni any -Y nas_5gs.mm.type_id -T json - #todo check if tshark running already to keep from saturating network - ClusterTools.exec_by_node("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - Log.info { "after exec by node bg" } - tshark_log_name - end + false + end - def self.log_of_tshark_by_node_bg(command, node, duration="120") : String - Log.info { "log_of_tshark_by_node: command #{command}" } - #create a unique name for the log - rnd = Random.new - name_id = rnd.next_int.abs - tshark_log_name = "/tmp/tshark-#{name_id}.json" - Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" } - - #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - #command= -ni any -Y nas_5gs.mm.type_id -T json - #todo check if tshark running already to keep from saturating network - ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - Log.info { "after exec by node bg" } - tshark_log_name - end + # Retrieves the file path where the capture is stored. + def get_capture_file_path : String + @capture_file_path + end + private def generate_capture_file_path : String + name_id = Random.new.next_int.abs - def self.regex_tshark_log_scan(regex, tshark_log_name) - Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" } - resp = File.read("#{tshark_log_name}") - Log.debug { "tshark_log_name resp: #{resp}" } - if resp - Log.debug { "resp: #{resp}" } - ret = resp.scan(regex) - else - Log.info { "file empty" } - ret = nil + "/tmp/tshark-#{name_id}.pcap" end - Log.info { "#{regex}: #{ret}" } - ret - end - def self.regex_tshark_log_match(regex, tshark_log_name) - Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" } - resp = File.read("#{tshark_log_name}") - Log.info { "tshark_log_name resp: #{resp}" } - if resp - Log.info { "resp: #{resp}" } - ret = resp =~ regex - else - Log.info { "file empty" } - ret = nil + private def launch_capture(command : String) + begin + # Start tshark capture. + ClusterTools.exec_by_node_bg(command, @node_match.not_nil!) + rescue ex : Exception + error_message = "Could not start tshark capture process: #{ex.message}" + Log.error { error_message } + raise K8sTsharkError.new(error_message) + end end - Log.info { "#{regex}: #{ret}" } - ret - end - def self.regex_tshark_log(regex, tshark_log_name) - Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" } - regex_found : Bool | Nil - if regex_tshark_log_match(regex, tshark_log_name) - regex_found = true - else - regex_found = false + private def retrieve_pid(command : String, pid_file : String) + begin + # Store the pid of the tshark process. + ClusterTools.exec_by_node_bg(command, @node_match.not_nil!) + + error_message = "Could not retrieve the PID of the tshark process" + + # Wait for pidfile to be readable using repeat_with_timeout + pid_found = repeat_with_timeout(timeout: 10, errormsg: error_message) do + File.exists?(pid_file) && File.size(pid_file) > 0 + end + + if pid_found + @pid = File.read(pid_file).strip.to_i + Log.info { "tshark process started with PID: #{@pid}" } + else + # Attempt to kill the capture process if it was started. + ClusterTools.exec_by_node_bg("pkill -15 tshark && sleep 1 && pkill -9 tshark", @node_match.not_nil!) + raise K8sTsharkError.new(error_message) + end + ensure + File.delete(pid_file) if File.exists?(pid_file) + end end - regex_found end + class K8sTsharkError < Exception + def initialize(message : String) + super(message) + end + end end diff --git a/src/tasks/utils/oran_monitor.cr b/src/tasks/utils/oran_monitor.cr index f019f14ef..2b7182f61 100644 --- a/src/tasks/utils/oran_monitor.cr +++ b/src/tasks/utils/oran_monitor.cr @@ -24,49 +24,46 @@ module ORANMonitor Log.info { "start_e2_capture" } ric_key : String = "" ric_value : String = "" - tshark_log_name : String | Nil + capture : K8sTshark::TsharkPacketCapture | Nil ric = isCNFaRIC?(cnf_config) if ric ric_key = ric[:ric_key] ric_value = ric[:ric_value] command = "-i any -f 'sctp port 36421' -d 'sctp.port==36421,e2ap' -T json" - # tshark_log_name = K8sTshark.log_of_tshark_by_label(command, ric_key, core_value) #todo check all nodes? nodes = KubectlClient::Get.schedulable_nodes_list node = nodes.first + resp = ClusterTools.exec_by_node("tshark --version", node) Log.info { "tshark must be version 4.0.3 or higher" } Log.info { "tshark output #{resp[:output]}" } - tshark_log_name = K8sTshark.log_of_tshark_by_node_bg(command,node) + + capture = K8sTshark::TsharkPacketCapture.new + capture.begin_capture_by_node(node, command) else - tshark_log_name = nil + capture = nil end - tshark_log_name + + capture end - def self.e2_session_established?(tshark_log_name) - Log.info { "e2_session_established tshark_log_name: #{tshark_log_name}" } + def self.e2_session_established?(capture : K8sTshark::TsharkPacketCapture | Nil) + Log.info { "e2_session_established" } e2_found : Bool = false - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil - - if K8sTshark.regex_tshark_log(/e2ap\.successfulOutcome_element[^$]*e2ap.ric_ID/, tshark_log_name) + + if !capture.nil? + if capture.regex_match?(/e2ap\.successfulOutcome_element[^$]*e2ap.ric_ID/) Log.info { "regex found " } e2_found = true else Log.info { "regex not found " } - e2_found = false end + Log.info { "found e2ap.successfulOutcome_element followed by e2ap.ric_ID?: #{e2_found}" } - #todo delete log file - else - e2_found = false - puts "no e2 log".colorize(:red) + capture.terminate_capture end + e2_found end - end - diff --git a/src/tasks/workload/5g_validator.cr b/src/tasks/workload/5g_validator.cr index ae8f52bf4..d99ce7411 100644 --- a/src/tasks/workload/5g_validator.cr +++ b/src/tasks/workload/5g_validator.cr @@ -50,23 +50,14 @@ task "smf_upf_heartbeat" do |t, args| smf_value = config.cnf_config[:smf_label].split("=").last if upf if smf && upf - #todo document 3gpp standard for heartbeat command = "-ni any -Y 'pfcp.msg_type == 1 or pfcp.msg_type == 2' -T json" - #Baseline + # If baseline is not set unless baseline_count - tshark_log_name = K8sTshark.log_of_tshark_by_label(command, smf_key, smf_value, duration: "120") - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil - scan = K8sTshark.regex_tshark_log_scan(/"pfcp\.msg_type": "(1|2)"/, tshark_log_name) - if scan - baseline_count = scan.size - Log.info { "Baseline matches: #{baseline_count}" } - end - end + baseline_count = smf_up_heartbeat_capture_matches_count(smf_key, smf_value, command) end + Log.info { "Baseline matches: #{baseline_count}" } #todo accept list of resilience tests #todo loop through all resilience tests @@ -76,26 +67,16 @@ task "smf_upf_heartbeat" do |t, args| spawn do Log.info { "before invoke of pod delete" } args.named["pod_labels"]="#{smf},#{upf}" - # t.invoke("pod_delete", args) + # t.invoke("pod_delete", args) t.invoke("pod_network_latency", args) Log.info { "after invoke of pod delete" } sync_channel.send(nil) end Log.info { "Main pod delete thread continuing" } - - tshark_log_name = K8sTshark.log_of_tshark_by_label(command, smf_key, smf_value, duration: "120") - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil - - Log.info { "TShark Log File: #{tshark_log_name}" } - scan = K8sTshark.regex_tshark_log_scan(/"pfcp\.msg_type": "(1|2)"/, tshark_log_name) - if scan - chaos_count = scan.size - Log.info { "Chaos Matches: #{chaos_count}" } - end - end + # Chaos matches + chaos_count = smf_up_heartbeat_capture_matches_count(smf_key, smf_value, command) + Log.info { "Chaos Matches: #{chaos_count}" } Log.info { "before pod delete receive" } sync_channel.receive @@ -119,7 +100,6 @@ task "smf_upf_heartbeat" do |t, args| Log.info { "Heartbeat not found" } heartbeat_found = false end - else heartbeat_found = false puts "no 5g labels".colorize(:red) @@ -134,6 +114,13 @@ task "smf_upf_heartbeat" do |t, args| end end +def smf_up_heartbeat_capture_matches_count(smf_key : String, smf_value : String, command : String) + K8sTshark::TsharkPacketCapture.begin_capture_by_label(smf_key, smf_value, command) do |capture| + sleep 60 + capture.regex_search(/"pfcp\.msg_type": "(1|2)"/).size + end +end + #todo move to 5g test files desc "Test if a 5G core supports SUCI Concealment" task "suci_enabled" do |t, args| @@ -145,33 +132,21 @@ task "suci_enabled" do |t, args| core_value : String = "" core_key = config.cnf_config[:amf_label].split("=").first if core core_value = config.cnf_config[:amf_label].split("=").last if core - if core - - command = "-ni any -Y nas_5gs.mm.type_id -T json" - tshark_log_name = K8sTshark.log_of_tshark_by_label_bg(command, core_key, core_value) - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil + command = "-ni any -Y nas_5gs.mm.type_id -T json" + if core + K8sTshark::TsharkPacketCapture.begin_capture_by_label(core_key, core_value, command) do |capture| #todo put in prereq UERANSIM.install(config) sleep 30.0 - #TODO 5g RAN (only) mobile traffic check ???? + # TODO 5g RAN (only) mobile traffic check ???? # use suci encyption but don't use a null encryption key - if K8sTshark.regex_tshark_log(/"nas_5gs.mm.type_id": "1"/, tshark_log_name) && - - !K8sTshark.regex_tshark_log(/"nas_5gs.mm.suci.scheme_id": "0"/, tshark_log_name) && - !K8sTshark.regex_tshark_log(/"nas_5gs.mm.suci.pki": "0"/, tshark_log_name) - suci_found = true - else - suci_found = false - end - Log.info { "found nas_5gs.mm.type_id: 1: #{suci_found}" } + suci_found = capture.regex_match?(/"nas_5gs.mm.type_id": "1"/) && \ + !capture.regex_match?(/"nas_5gs.mm.suci.scheme_id": "0"/) && \ + !capture.regex_match?(/"nas_5gs.mm.suci.pki": "0"/) + Log.info { "suci_found: #{suci_found}" } #todo delete log file - else - suci_found = false - puts "no 5g labels".colorize(:red) end else suci_found = false @@ -186,7 +161,6 @@ task "suci_enabled" do |t, args| end ensure Helm.delete("ueransim") - ClusterTools.uninstall ClusterTools.install end