diff --git a/src/tasks/utils/cnf_manager.cr b/src/tasks/utils/cnf_manager.cr index 8eff904a1..5810a1426 100644 --- a/src/tasks/utils/cnf_manager.cr +++ b/src/tasks/utils/cnf_manager.cr @@ -865,8 +865,9 @@ module CNFManager baselines = JaegerManager.unique_services_total Log.info { "baselines: #{baselines}" } end + # todo start tshark monitoring the e2 traffic - tshark_log_name = ORANMonitor.start_e2_capture?(config.cnf_config) + capture = ORANMonitor.start_e2_capture?(config.cnf_config) # todo separate out install methods into a module/function that accepts a block liveness_time = 0 @@ -1061,9 +1062,10 @@ module CNFManager else tracing_used = false end + if ORANMonitor.isCNFaRIC?(config.cnf_config) sleep 30 - e2_found = ORANMonitor.e2_session_established?(tshark_log_name) + e2_found = ORANMonitor.e2_session_established?(capture) else e2_found = false end diff --git a/src/tasks/utils/k8s_tshark.cr b/src/tasks/utils/k8s_tshark.cr index 11da993bd..c9d8a7f60 100644 --- a/src/tasks/utils/k8s_tshark.cr +++ b/src/tasks/utils/k8s_tshark.cr @@ -5,148 +5,212 @@ require "halite" module K8sTshark + class TsharkPacketCapture + property capture_file_path : String + property pid : Int32? + private property node_match : JSON::Any? - def self.log_of_tshark_by_label(command, label_key, label_value, duration="120") : String - Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" } + def initialize + @capture_file_path = "" + end + + def finalize + if @pid + terminate_capture + end + end + + # Method to provide a block context for capture by label. + def self.begin_capture_by_label(label_key : String, label_value : String, command : String = "", &block : TsharkPacketCapture ->) + capture = new + begin + capture.begin_capture_by_label(label_key, label_value, command) + yield capture + ensure + capture.terminate_capture + end + end + + # Method to provide a block context for capture by node. + def self.begin_capture_by_node(node : JSON::Any, command : String = "", &block : TsharkPacketCapture ->) + capture = new + begin + capture.begin_capture_by_node(node, command) + yield capture + ensure + capture.terminate_capture + end + end + + # Starts a tshark packet capture on the node where the pod with the specified label is running. + # label_key and label_value: Used to identify the pod's label. + # command: Parameters to be passed to tshark. + def begin_capture_by_label(label_key : String, label_value : String, command : String = "") + Log.info { "Searching for the pod matching the label '#{label_key}:#{label_value}'."} all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list) - pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value) - first_labeled_pod = pods[0]? - Log.info { "first_labeled_pod: #{first_labeled_pod}" } - if first_labeled_pod && first_labeled_pod.dig?("metadata", "name") - Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" } - pod_name = first_labeled_pod.dig("metadata", "name") - Log.info { "pod_name: #{pod_name}" } - nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod) - node = nodes.first - #create a unique name for the log - # rnd = Random.new - # name_id = rnd.next_int - # tshark_log_name = "/tmp/tshark-#{name_id}.json" - # Log.info { "tshark_log_name #{tshark_log_name}" } - # - # #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - # #command= -ni any -Y nas_5gs.mm.type_id -T json - # #todo check if tshark running already to keep from saturating network - # #todo play with reducing default duration - # ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - # ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node) - # Log.info { "after exec by node bg" } - # resp = tshark_log_name - resp = log_of_tshark_by_node(command, node, duration) + pod_match = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value).first? + + unless pod_match && pod_match.dig?("metadata", "name") + error_message = "Pod with label '#{label_key}:#{label_value}' could not be found." + Log.error { error_message } + raise K8sTsharkError.new(error_message) + end + + pod_name = pod_match.dig("metadata", "name") + Log.info { "Pod '#{pod_name}'' matches the label '#{label_key}:#{label_value}'." } + + Log.info { "Searching for the node running the pod '#{pod_name}'." } + @node_match = KubectlClient::Get.nodes_by_pod(pod_match).first + + unless @node_match && @node_match.not_nil!.dig?("metadata", "name") + error_message = "Node for pod '#{pod_name}' could not be found." + Log.error { error_message } + raise K8sTsharkError.new(error_message) + end + + node_name = node_match.not_nil!.dig("metadata", "name") + Log.info { "Pod '#{pod_name}' is running on node '#{node_name}'." } + + begin_capture_common(command) + end + + # Starts a tshark packet capture on the specified node. + # node: The node where the capture should be performed. + # command: Parameters to be passed to tshark. + # duration: Optional; specifies the capture duration, eliminating the need to call terminate_capture manually. + def begin_capture_by_node(node : JSON::Any, command : String = "") + @node_match = node + begin_capture_common(command) + end + + # Common method to unify capture by label and node. + private def begin_capture_common(command : String) + if @pid + Log.warn { "Ongoing capture process exists, terminate it or create a new capture." } + return + end + + Log.info { "Starting tshark capture with command: #{command}." } + + @capture_file_path = generate_capture_file_path() + Log.info { "Capturing packets on path: #{@capture_file_path}." } + + # Other possible options to resolve the pid conundrum: + # 1. pgrep tshark -f -x "tshark #{command}" + # 2. ...; echo $! > /tmp/pidfile + # 3. bake in 'echo $$!', retrieve from some file + # 4. fix kubectl_client to return the pid of the process that is launched, not the shell + pid_file = "/tmp/pidfile" + pid_command = "ps -eo pid,cmd,start --sort=start_time | grep '[t]shark' | tail -1 | awk '{print $1}' > #{pid_file}" + capture_command = "tshark #{command} > #{@capture_file_path} 2>&1" + + launch_capture(capture_command) + retrieve_pid(pid_command, pid_file) + end + + # Terminates the tshark packet capture process. + def terminate_capture + if @pid + Log.info { "Terminating packet capture with PID: #{@pid}." } + Log.info { "Capture collected on path: #{@capture_file_path}." } + + # Some tshark captures were left in zombie states if only kill/kill -9 was invoked. + ClusterTools.exec_by_node_bg("kill -15 #{@pid}", @node_match.not_nil!) + sleep 1 + ClusterTools.exec_by_node_bg("kill -9 #{@pid}", @node_match.not_nil!) + + @pid = nil + @node_match = nil else - resp = "label key:#{label_key} value: #{label_value} not found" + Log.warn { "No active capture process to terminate." } end - Log.info { "resp #{resp}" } - resp - end + end - def self.log_of_tshark_by_label_bg(command, label_key, label_value, duration="120") : String - Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" } - all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list) - pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value) - first_labeled_pod = pods[0]? - Log.info { "first_labeled_pod: #{first_labeled_pod}" } - if first_labeled_pod && first_labeled_pod.dig?("metadata", "name") - Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" } - pod_name = first_labeled_pod.dig("metadata", "name") - Log.info { "pod_name: #{pod_name}" } - nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod) - node = nodes.first - #create a unique name for the log - # rnd = Random.new - # name_id = rnd.next_int - # tshark_log_name = "/tmp/tshark-#{name_id}.json" - # Log.info { "tshark_log_name #{tshark_log_name}" } - # - # #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - # #command= -ni any -Y nas_5gs.mm.type_id -T json - # #todo check if tshark running already to keep from saturating network - # #todo play with reducing default duration - # ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - # ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node) - # Log.info { "after exec by node bg" } - # resp = tshark_log_name - resp = log_of_tshark_by_node_bg(command, node, duration: "120") - else - resp = "label key:#{label_key} value: #{label_value} not found" + # Searches the capture file for lines matching the specified regex pattern. + # Returns an array of matching lines. + def regex_search(pattern : Regex) : Array(String) + matches = [] of String + + if @capture_file_path.empty? + Log.warn { "Cannot find a match using a regular expression before a capture has been started." } + else + Log.info { "Collecting lines matching the regular expression #{pattern}." } + file_content = File.read(@capture_file_path) + matches = file_content.scan(pattern).map(&.string) + Log.debug { "Printing out matching lines:\n#{matches}" } + end + + matches end - Log.info { "resp #{resp}" } - resp - end + # Checks if any line in the capture file matches the specified regex pattern. + # Returns true if a match is found, otherwise false. + def regex_match?(pattern : Regex) : Bool + if @capture_file_path.empty? + Log.warn { "Cannot find a match using a regular expression before a capture has been started." } + else + Log.info { "Finding a match for regular expression: #{pattern} in file: #{capture_file_path}." } + file_content = File.read(@capture_file_path) + if file_content.scan(pattern).any? + Log.debug { "Match found for regular expression: #{pattern}" } + return true + end + end - def self.log_of_tshark_by_node(command, node, duration="120") : String - Log.info { "log_of_tshark_by_node: command #{command}" } - #create a unique name for the log - rnd = Random.new - name_id = rnd.next_int.abs - tshark_log_name = "/tmp/tshark-#{name_id}.json" - Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" } - - #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - #command= -ni any -Y nas_5gs.mm.type_id -T json - #todo check if tshark running already to keep from saturating network - ClusterTools.exec_by_node("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - Log.info { "after exec by node bg" } - tshark_log_name - end + false + end - def self.log_of_tshark_by_node_bg(command, node, duration="120") : String - Log.info { "log_of_tshark_by_node: command #{command}" } - #create a unique name for the log - rnd = Random.new - name_id = rnd.next_int.abs - tshark_log_name = "/tmp/tshark-#{name_id}.json" - Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" } - - #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log - #command= -ni any -Y nas_5gs.mm.type_id -T json - #todo check if tshark running already to keep from saturating network - ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node) - Log.info { "after exec by node bg" } - tshark_log_name - end + # Retrieves the file path where the capture is stored. + def get_capture_file_path : String + @capture_file_path + end + private def generate_capture_file_path : String + name_id = Random.new.next_int.abs - def self.regex_tshark_log_scan(regex, tshark_log_name) - Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" } - resp = File.read("#{tshark_log_name}") - Log.debug { "tshark_log_name resp: #{resp}" } - if resp - Log.debug { "resp: #{resp}" } - ret = resp.scan(regex) - else - Log.info { "file empty" } - ret = nil + "/tmp/tshark-#{name_id}.pcap" end - Log.info { "#{regex}: #{ret}" } - ret - end - def self.regex_tshark_log_match(regex, tshark_log_name) - Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" } - resp = File.read("#{tshark_log_name}") - Log.info { "tshark_log_name resp: #{resp}" } - if resp - Log.info { "resp: #{resp}" } - ret = resp =~ regex - else - Log.info { "file empty" } - ret = nil + private def launch_capture(command : String) + begin + # Start tshark capture. + ClusterTools.exec_by_node_bg(command, @node_match.not_nil!) + rescue ex : Exception + error_message = "Could not start tshark capture process: #{ex.message}" + Log.error { error_message } + raise K8sTsharkError.new(error_message) + end end - Log.info { "#{regex}: #{ret}" } - ret - end - def self.regex_tshark_log(regex, tshark_log_name) - Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" } - regex_found : Bool | Nil - if regex_tshark_log_match(regex, tshark_log_name) - regex_found = true - else - regex_found = false + private def retrieve_pid(command : String, pid_file : String) + begin + # Store the pid of the tshark process. + ClusterTools.exec_by_node_bg(command, @node_match.not_nil!) + + error_message = "Could not retrieve the PID of the tshark process" + + # Wait for pidfile to be readable using repeat_with_timeout + pid_found = repeat_with_timeout(timeout: 10, errormsg: error_message) do + File.exists?(pid_file) && File.size(pid_file) > 0 + end + + if pid_found + @pid = File.read(pid_file).strip.to_i + Log.info { "tshark process started with PID: #{@pid}" } + else + # Attempt to kill the capture process if it was started. + ClusterTools.exec_by_node_bg("pkill -15 tshark && sleep 1 && pkill -9 tshark", @node_match.not_nil!) + raise K8sTsharkError.new(error_message) + end + ensure + File.delete(pid_file) if File.exists?(pid_file) + end end - regex_found end + class K8sTsharkError < Exception + def initialize(message : String) + super(message) + end + end end diff --git a/src/tasks/utils/oran_monitor.cr b/src/tasks/utils/oran_monitor.cr index f019f14ef..2b7182f61 100644 --- a/src/tasks/utils/oran_monitor.cr +++ b/src/tasks/utils/oran_monitor.cr @@ -24,49 +24,46 @@ module ORANMonitor Log.info { "start_e2_capture" } ric_key : String = "" ric_value : String = "" - tshark_log_name : String | Nil + capture : K8sTshark::TsharkPacketCapture | Nil ric = isCNFaRIC?(cnf_config) if ric ric_key = ric[:ric_key] ric_value = ric[:ric_value] command = "-i any -f 'sctp port 36421' -d 'sctp.port==36421,e2ap' -T json" - # tshark_log_name = K8sTshark.log_of_tshark_by_label(command, ric_key, core_value) #todo check all nodes? nodes = KubectlClient::Get.schedulable_nodes_list node = nodes.first + resp = ClusterTools.exec_by_node("tshark --version", node) Log.info { "tshark must be version 4.0.3 or higher" } Log.info { "tshark output #{resp[:output]}" } - tshark_log_name = K8sTshark.log_of_tshark_by_node_bg(command,node) + + capture = K8sTshark::TsharkPacketCapture.new + capture.begin_capture_by_node(node, command) else - tshark_log_name = nil + capture = nil end - tshark_log_name + + capture end - def self.e2_session_established?(tshark_log_name) - Log.info { "e2_session_established tshark_log_name: #{tshark_log_name}" } + def self.e2_session_established?(capture : K8sTshark::TsharkPacketCapture | Nil) + Log.info { "e2_session_established" } e2_found : Bool = false - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil - - if K8sTshark.regex_tshark_log(/e2ap\.successfulOutcome_element[^$]*e2ap.ric_ID/, tshark_log_name) + + if !capture.nil? + if capture.regex_match?(/e2ap\.successfulOutcome_element[^$]*e2ap.ric_ID/) Log.info { "regex found " } e2_found = true else Log.info { "regex not found " } - e2_found = false end + Log.info { "found e2ap.successfulOutcome_element followed by e2ap.ric_ID?: #{e2_found}" } - #todo delete log file - else - e2_found = false - puts "no e2 log".colorize(:red) + capture.terminate_capture end + e2_found end - end - diff --git a/src/tasks/workload/5g_validator.cr b/src/tasks/workload/5g_validator.cr index ae8f52bf4..d99ce7411 100644 --- a/src/tasks/workload/5g_validator.cr +++ b/src/tasks/workload/5g_validator.cr @@ -50,23 +50,14 @@ task "smf_upf_heartbeat" do |t, args| smf_value = config.cnf_config[:smf_label].split("=").last if upf if smf && upf - #todo document 3gpp standard for heartbeat command = "-ni any -Y 'pfcp.msg_type == 1 or pfcp.msg_type == 2' -T json" - #Baseline + # If baseline is not set unless baseline_count - tshark_log_name = K8sTshark.log_of_tshark_by_label(command, smf_key, smf_value, duration: "120") - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil - scan = K8sTshark.regex_tshark_log_scan(/"pfcp\.msg_type": "(1|2)"/, tshark_log_name) - if scan - baseline_count = scan.size - Log.info { "Baseline matches: #{baseline_count}" } - end - end + baseline_count = smf_up_heartbeat_capture_matches_count(smf_key, smf_value, command) end + Log.info { "Baseline matches: #{baseline_count}" } #todo accept list of resilience tests #todo loop through all resilience tests @@ -76,26 +67,16 @@ task "smf_upf_heartbeat" do |t, args| spawn do Log.info { "before invoke of pod delete" } args.named["pod_labels"]="#{smf},#{upf}" - # t.invoke("pod_delete", args) + # t.invoke("pod_delete", args) t.invoke("pod_network_latency", args) Log.info { "after invoke of pod delete" } sync_channel.send(nil) end Log.info { "Main pod delete thread continuing" } - - tshark_log_name = K8sTshark.log_of_tshark_by_label(command, smf_key, smf_value, duration: "120") - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil - - Log.info { "TShark Log File: #{tshark_log_name}" } - scan = K8sTshark.regex_tshark_log_scan(/"pfcp\.msg_type": "(1|2)"/, tshark_log_name) - if scan - chaos_count = scan.size - Log.info { "Chaos Matches: #{chaos_count}" } - end - end + # Chaos matches + chaos_count = smf_up_heartbeat_capture_matches_count(smf_key, smf_value, command) + Log.info { "Chaos Matches: #{chaos_count}" } Log.info { "before pod delete receive" } sync_channel.receive @@ -119,7 +100,6 @@ task "smf_upf_heartbeat" do |t, args| Log.info { "Heartbeat not found" } heartbeat_found = false end - else heartbeat_found = false puts "no 5g labels".colorize(:red) @@ -134,6 +114,13 @@ task "smf_upf_heartbeat" do |t, args| end end +def smf_up_heartbeat_capture_matches_count(smf_key : String, smf_value : String, command : String) + K8sTshark::TsharkPacketCapture.begin_capture_by_label(smf_key, smf_value, command) do |capture| + sleep 60 + capture.regex_search(/"pfcp\.msg_type": "(1|2)"/).size + end +end + #todo move to 5g test files desc "Test if a 5G core supports SUCI Concealment" task "suci_enabled" do |t, args| @@ -145,33 +132,21 @@ task "suci_enabled" do |t, args| core_value : String = "" core_key = config.cnf_config[:amf_label].split("=").first if core core_value = config.cnf_config[:amf_label].split("=").last if core - if core - - command = "-ni any -Y nas_5gs.mm.type_id -T json" - tshark_log_name = K8sTshark.log_of_tshark_by_label_bg(command, core_key, core_value) - if tshark_log_name && - !tshark_log_name.empty? && - (tshark_log_name =~ /not found/) == nil + command = "-ni any -Y nas_5gs.mm.type_id -T json" + if core + K8sTshark::TsharkPacketCapture.begin_capture_by_label(core_key, core_value, command) do |capture| #todo put in prereq UERANSIM.install(config) sleep 30.0 - #TODO 5g RAN (only) mobile traffic check ???? + # TODO 5g RAN (only) mobile traffic check ???? # use suci encyption but don't use a null encryption key - if K8sTshark.regex_tshark_log(/"nas_5gs.mm.type_id": "1"/, tshark_log_name) && - - !K8sTshark.regex_tshark_log(/"nas_5gs.mm.suci.scheme_id": "0"/, tshark_log_name) && - !K8sTshark.regex_tshark_log(/"nas_5gs.mm.suci.pki": "0"/, tshark_log_name) - suci_found = true - else - suci_found = false - end - Log.info { "found nas_5gs.mm.type_id: 1: #{suci_found}" } + suci_found = capture.regex_match?(/"nas_5gs.mm.type_id": "1"/) && \ + !capture.regex_match?(/"nas_5gs.mm.suci.scheme_id": "0"/) && \ + !capture.regex_match?(/"nas_5gs.mm.suci.pki": "0"/) + Log.info { "suci_found: #{suci_found}" } #todo delete log file - else - suci_found = false - puts "no 5g labels".colorize(:red) end else suci_found = false @@ -186,7 +161,6 @@ task "suci_enabled" do |t, args| end ensure Helm.delete("ueransim") - ClusterTools.uninstall ClusterTools.install end