From 131a09c66cfd91762012a651d10b4016b06005e0 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Wed, 6 May 2026 22:23:55 -0700 Subject: [PATCH 01/14] py-json/lf_attenmod.py: Remove unnecessary printout Signed-off-by: Alex Gavin --- py-json/lf_attenmod.py | 1 - 1 file changed, 1 deletion(-) diff --git a/py-json/lf_attenmod.py b/py-json/lf_attenmod.py index e29a4f189..8c36c4076 100644 --- a/py-json/lf_attenmod.py +++ b/py-json/lf_attenmod.py @@ -50,7 +50,6 @@ def set_command_param(self, command_name, param_name, param_value): self.atten_data[param_name] = param_value def show(self): - logger.info("Show Attenuators.........") response = self.json_get("/attenuators/") if response is None: logger.critical(response) From 79d69820deb4cc06f0580a1e65eec11a7869198c Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 11:30:22 -0700 Subject: [PATCH 02/14] attenuator_serial.py: Remove unnecessary printout Signed-off-by: Alex Gavin --- py-scripts/attenuator_serial.py | 1 - 1 file changed, 1 deletion(-) diff --git a/py-scripts/attenuator_serial.py b/py-scripts/attenuator_serial.py index 73b85f756..4cf0b007b 100755 --- a/py-scripts/attenuator_serial.py +++ b/py-scripts/attenuator_serial.py @@ -39,7 +39,6 @@ def __init__(self, lfclient_host, lfclient_port, debug_=False): def show(self, debug=False): ser_no_list = [] - print("Show Attenuators.........") response = self.json_get("/attenuators/") time.sleep(0.01) if response is None: From 344255124b5a78a5c3d3f7afc1dd4708c5221a84 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Tue, 5 May 2026 17:50:23 -0700 Subject: [PATCH 03/14] lf_roam_test.py: Move argument parsing to helper Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 0c9f9026d..b88b7d480 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -2058,17 +2058,7 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): logging.info(str(e)) -def main(): - help_summary = '''\ - The script is designed to support both hard and soft roaming, ensuring a smooth transition for devices between - access points (APs). Additionally, the script captures packets in two scenarios: when a device is connected to - an AP and when it roams from one AP to another. These captured packets help analyze the performance and stability - of the roaming process. In essence, the script serves as a thorough test for assessing how well APs handle - roaming and the overall network stability when clients move between different access points. - - The roaming test will create stations with advanced/802.1x and 11r key management, create CX traffic between upstream - port and stations, run traffic and generate a report. - ''' +def parse_args(): parser = argparse.ArgumentParser( prog='lf_roam_test.py', # formatter_class=argparse.RawDescriptionHelpFormatter, @@ -2232,7 +2222,22 @@ def main(): parser.add_argument('--help_summary', help='Show summary of what this script does', default=None, action="store_true") - args = parser.parse_args() + return parser.parse_args() + + +def main(): + help_summary = '''\ + The script is designed to support both hard and soft roaming, ensuring a smooth transition for devices between + access points (APs). Additionally, the script captures packets in two scenarios: when a device is connected to + an AP and when it roams from one AP to another. These captured packets help analyze the performance and stability + of the roaming process. In essence, the script serves as a thorough test for assessing how well APs handle + roaming and the overall network stability when clients move between different access points. + + The roaming test will create stations with advanced/802.1x and 11r key management, create CX traffic between upstream + port and stations, run traffic and generate a report. + ''' + + args = parse_args() # help summary if args.help_summary: From 3f9aeec338736fcbb41941fa1629b64f24d6d235 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Tue, 5 May 2026 18:00:53 -0700 Subject: [PATCH 04/14] lf_roam_test.py: Update LF access arguments to match other scripts Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index b88b7d480..255eeff49 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -2172,9 +2172,23 @@ def parse_args(): ''') required = parser.add_argument_group('Required arguments') - required.add_argument('--mgr', help='lanforge ip', default="192.168.100.221") - required.add_argument('--lanforge_port', help='lanforge port', type=int, default=8080) - required.add_argument('--lanforge_ssh_port', help='lanforge ssh port', type=int, default=22) + # LANforge connection settings + parser.add_argument("-m", "--mgr", + dest="lanforge_ip", + type=str, + default="localhost", + help="Hostname or IP address of the LANforge GUI machine (localhost is default)") + parser.add_argument("-o", "--port", "--lanforge_port", + dest="lanforge_port", + type=int, + default=8080, + help="IP Port the LANforge GUI is listening on (8080 is default)") + parser.add_argument("--lf_ssh_port", "--lanforge_ssh_port", + dest="lanforge_ssh_port", + type=int, + default=22, + help="LANforge system SSH port used to SSH in and pull reports") + required.add_argument('--ap1_bssid', type=str, help='AP1 bssid', default="68:7d:b4:5f:5c:3b") required.add_argument('--ap2_bssid', type=str, help='AP2 bssid', default="14:16:9d:53:58:cb") required.add_argument('--twog_radios', help='Twog radio', default=None) From 5c7a3140f8f6061ae65548c266a65b22a25e7dd0 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Tue, 5 May 2026 18:35:23 -0700 Subject: [PATCH 05/14] lf_roam_test.py: argument parsing readability, use **vars() initializer pattern Shorten help summary. Ordinarily wouldn't mix and match this much Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 138 +++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 66 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 255eeff49..7ffa40282 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -190,8 +190,8 @@ def __init__(self, lanforge_ip=None, debug=False, soft_roam=False, sta_type=None, - multicast=None - ): + multicast=None, + **kwargs): super().__init__(lanforge_ip, lanforge_port) self.lanforge_ip = lanforge_ip @@ -2189,20 +2189,71 @@ def parse_args(): default=22, help="LANforge system SSH port used to SSH in and pull reports") - required.add_argument('--ap1_bssid', type=str, help='AP1 bssid', default="68:7d:b4:5f:5c:3b") - required.add_argument('--ap2_bssid', type=str, help='AP2 bssid', default="14:16:9d:53:58:cb") - required.add_argument('--twog_radios', help='Twog radio', default=None) - required.add_argument('--fiveg_radios', help='Fiveg radio', default="1.1.wiphy1") - required.add_argument('--sixg_radios', help='Sixg radio', default=None) - required.add_argument('--band', help='eg. --band "twog" or sixg', default="fiveg") - required.add_argument('--sniff_radio', help='eg. --sniff_radio "wiphy2', default="wiphy2") - required.add_argument('--num_sta', help='eg. --num_sta 1', type=int, default=1) - required.add_argument('--ssid_name', help='eg. --ssid_name "ssid_5g"', default="RoamAP5g") - required.add_argument('--security', help='eg. --security "wpa2"', default="wpa2") - required.add_argument('--security_key', help='eg. --security_key "something"', default="something") - required.add_argument('--upstream', help='eg. --upstream "eth2"', default="eth2") + # Test settings + required.add_argument('--band', + default="fiveg", + help="Band in which to run test (e.g. 'fiveg')") + required.add_argument("--iters", "--iteration", "--iterations", + dest="iteration", + type=int, + default=1, + help="Number of test iterations") + + required.add_argument('--ap1_bssid', + dest="c1_bssid", + type=str, + default="68:7d:b4:5f:5c:3b", + help='AP1 bssid') + required.add_argument('--ap2_bssid', + dest="c2_bssid", + type=str, + default="14:16:9d:53:58:cb", + help='AP2 bssid') + + # Packet capture settings + required.add_argument('--sniff_radio', + default="wiphy2", + help="Radio used for packet capture") + + # Station creation settings + required.add_argument('--num_sta', + type=int, + default=1, + help="Number of stations to create") + + required.add_argument('--twog_radios', + dest="twog_radio", + default=None, + help="2.4 GHz radios (used for station creation)") + required.add_argument('--fiveg_radios', + dest="fiveg_radio", + default="1.1.wiphy1", + help="5 GHz radios (used for station creation)") + required.add_argument('--sixg_radios', + dest="sixg_radio", + default="1.1.wiphy1", + help="6 GHz radios (used for station creation)") + + required.add_argument("--ssid", "--ssid_name", + type=str, + default="RoamAP5g", + help="Common SSID for AP DUTs (used for station creation)") + required.add_argument('--security', + type=str, + default="wpa2", + help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >') + required.add_argument("--password", "--security_key", + dest="security_key", + type=str, + default="something") + + required.add_argument('--upstream', + type=str, + default="eth2", + help="Upstream port for traffic generation") + + # TODO: Cleanup and move to respective sections required.add_argument('--duration', help='duration', default=None) - required.add_argument('--iteration', help='Number of iterations', type=int, default=1) required.add_argument('--channel', help='Channel', type=str, default="40") required.add_argument('--option', help='eg. --option "ota', default="ota") required.add_argument('--iteration_based', help='Iteration based', default=False, action='store_true') @@ -2222,6 +2273,7 @@ def parse_args(): optional = parser.add_argument_group('Optional arguments') + # AP controller parameters optional.add_argument('--scheme', help='', default="ssh") optional.add_argument('--dest', help='', default="localhost") optional.add_argument('--user', help='', default="admin") @@ -2240,63 +2292,17 @@ def parse_args(): def main(): - help_summary = '''\ - The script is designed to support both hard and soft roaming, ensuring a smooth transition for devices between - access points (APs). Additionally, the script captures packets in two scenarios: when a device is connected to - an AP and when it roams from one AP to another. These captured packets help analyze the performance and stability - of the roaming process. In essence, the script serves as a thorough test for assessing how well APs handle - roaming and the overall network stability when clients move between different access points. - - The roaming test will create stations with advanced/802.1x and 11r key management, create CX traffic between upstream - port and stations, run traffic and generate a report. - ''' - args = parse_args() - # help summary + help_summary = "This script will test station roaming, both hard (forced) and soft (attenuation-based)" \ + "and generate a report upon completion. Support for packet capture is also included." + if args.help_summary: print(help_summary) exit(0) - obj = HardRoam(lanforge_ip=args.mgr, - lanforge_port=args.lanforge_port, - lanforge_ssh_port=args.lanforge_ssh_port, - c1_bssid=args.ap1_bssid, - c2_bssid=args.ap2_bssid, - fiveg_radio=args.fiveg_radios, - twog_radio=args.twog_radios, - sixg_radio=args.sixg_radios, - band=args.band, - sniff_radio_=args.sniff_radio, - num_sta=args.num_sta, - security=args.security, - security_key=args.security_key, - ssid=args.ssid_name, - upstream=args.upstream, - duration=args.duration, - iteration=args.iteration, - channel=args.channel, - option=args.option, - duration_based=args.duration_based, - iteration_based=args.iteration_based, - dut_name=args.dut_name, - traffic_type=args.traffic_type, - scheme="ssh", - dest="localhost", - user="admin", - passwd="Cisco123", - prompt="WLC2", - series_cc="9800", - ap="AP687D.B45C.1D1C", - port="8888", - band_cc="5g", - timeout="10", - identity=args.identity, - ttls_pass=args.ttls_pass, - soft_roam=args.soft_roam, - sta_type=args.sta_type, - multicast=args.multicast - ) + obj = HardRoam(**vars(args), sniff_radio_=args.sniff_radio) + x = os.getcwd() print("Current Working Directory :", x) file = obj.generate_csv() From 9e906cc5d5fafd68f97b7d369822ffc2308033d2 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Wed, 6 May 2026 22:38:08 -0700 Subject: [PATCH 06/14] lf_roam_test.py: Pretty-ify reporting labels/titles Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 92 ++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 7ffa40282..f2a5d6c30 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -1839,12 +1839,12 @@ def generate_client_pass_fail_graph(self, csv_list=None): # dataset = [[9, 7 , 4], [1, 3, 4]] graph = lf_graph.lf_bar_graph(_data_set=dataset, _xaxis_name="Total Number Of Stations = " + str(self.num_sta), - _yaxis_name="Total Number of iterations = " + str(self.iteration), - _xaxis_categories=x_axis_category, _label=["PASS", "FAIL"], _xticks_font=8, + _yaxis_name="Total Number of Iterations = " + str(self.iteration), + _xaxis_categories=x_axis_category, _label=["Pass", "Fail"], _xticks_font=8, _graph_image_name="11r roam client per iteration graph", _color=['forestgreen', 'red', 'blueviolet'], _color_edge='black', _figsize=(13, 5), _xaxis_step=1, - _graph_title="Client Performance Over %s Iterations" % (str(self.iteration)), + _graph_title="Client Roaming Performance Over %s Iteration(s)" % (str(self.iteration)), _show_bar_value=True, _text_font=12, _text_rotation=45, _enable_csv=True, _legend_loc="upper right", _legend_fontsize=12, _legend_box=(1.12, 1.01), _remove_border=['top', 'right', 'left'], _alignment={"left": 0.011}, ) @@ -1856,20 +1856,22 @@ def generate_client_pass_fail_graph(self, csv_list=None): logging.info(str(e)) print(str(e)) - # Report generation function def generate_report(self, csv_list, kernel_lst, current_path=None): try: option, band_, station_, iteration__ = None, None, None, None + if self.option == 'ota': option = "OTA" else: option = "OTD" + if self.band == "fiveg": - band_ = "5G" + band_ = "5 GHz" elif self.band == "twog": - band_ = "2G" + band_ = "2 GHz" elif self.band == "sixg": - band_ = "6G" + band_ = "6 GHz" + if int(self.num_sta) > 1: station_ = "Multi" else: @@ -1879,6 +1881,7 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): iteration__ = "Multi" else: iteration__ = "Single" + if self.soft_roam: dir_name = "Soft_Roam_Test_" + str(band_) + "_" + str(option) + "_" + str(station_) + "Client_" + str( iteration__) + "_Iteration" @@ -1889,8 +1892,13 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): iteration__) + "_Iteration" out_html = "hard_roam.html" pdf_name = "Hard_roam_test.pdf" - report = lf_report_pdf.lf_report(_path="", _results_dir_name=dir_name, _output_html=out_html, + + # Initialize report object + report = lf_report_pdf.lf_report(_path="", + _results_dir_name=dir_name, + _output_html=out_html, _output_pdf=pdf_name) + if current_path is not None: report.current_path = os.path.dirname(os.path.abspath(current_path)) report_path = report.get_report_path() @@ -1900,22 +1908,23 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): report.move_data(directory="kernel_log", _file_name=str(i)) date = str(datetime.now()).split(",")[0].replace(" ", "-").split(".")[0] test_setup_info = { - "DUT Name": self.dut_name, + "Name(s)": self.dut_name, "SSID": self.ssid_name, - "Test Duration": self.test_duration, } + + # Report title if self.soft_roam: - report.set_title("SOFT ROAM (11r) TEST") + report.set_title("Soft Roam (11r) Test") else: if self.sta_type == "normal": - report.set_title("HARD ROAM TEST") + report.set_title("Hard Roam Test") else: - report.set_title("HARD ROAM (11r) TEST") + report.set_title("Hard Roam (11r) Test") report.set_date(date) report.build_banner_cover() report.set_table_title("Test Setup Information") report.build_table_title() - report.test_setup_table(value="Device under test", test_setup_data=test_setup_info) + report.test_setup_table(value="Device Under Test", test_setup_data=test_setup_info) report.set_obj_html("Objective", "The Roaming test is a type of performance test that is performed on wireless Access Points (APs)" " to evaluate their ability to support 802.11r (Fast BSS Transition) standard for fast and seamless" @@ -1934,7 +1943,7 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): " allowing for a faster and more secure transition. Soft roaming with 11r is designed to be seamless," " allowing the device to move from one Access Point to another without any interruption in connectivity.") report.build_objective() - report.set_obj_html("Client per iteration Graph", + report.set_obj_html("Client Per-Iteration Graph", "The below graph provides information about out of total iterations how many times each client got Pass or Fail") report.build_objective() graph = self.generate_client_pass_fail_graph(csv_list=csv_list) @@ -2001,58 +2010,63 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): p = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Pcap file Name") lf = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Log File") r = lf_csv_obj.read_csv(file_name=str(report_path) + "/csv_data/" + str(x), column="Remark") + if self.multicast == "True": table = { - "iterations": y, - "Bssid before": z, - "Bssid After": u, - "PASS/FAIL": h, + "Iteration": y, + "BSSID Before Roam": z, + "BSSID After Roam": u, + "Pass/Fail": h, "Remark": r } else: table = { - "iterations": y, - "Bssid before": z, - "Bssid After": u, - "Roam Time(ms)": t, - "PASS/FAIL": h, - "pcap file name": p, + "Iteration": y, + "BSSID Before Roam": z, + "BSSID After Roam": u, + "Roam Time (ms)": t, + "Pass/Fail": h, + "Packet Capture File": p, "Log File": lf, "Remark": r } if self.multicast != "True": if not self.log_file: del table["Log File"] - print("Tabel Data :", table) + + logger.debug("Report table data:", table) test_setup = pd.DataFrame(table) report.set_table_dataframe(test_setup) report.build_table() + if self.option == 'ota': - testname = 'over the air' + testname = "Over the air" else: - testname = 'over the ds' + testname = "Over the DS" + test_input_infor = { - "LANforge ip": self.lanforge_ip, - "LANforge port": self.lanforge_port, - "test start time": self.start_time, - "test end time": self.end_time, + "LANforge IP": self.lanforge_ip, + "LANforge API Port": self.lanforge_port, + "Test Start": self.start_time, + "Test End": self.end_time, "Bands": self.band, - "Upstream": self.upstream, - "Stations": self.num_sta, - "iterations": self.iteration, + "Iterations": self.iteration, + "Upstream Port": self.upstream, + "Stations Count": self.num_sta, "SSID": self.ssid_name, "Security": self.security, - "Client mac": self.mac_data, - 'Test': testname, - "Contact": "support@candelatech.com" + "Client MACs": self.mac_data, + "Test": testname, } - report.set_table_title("Test basic Information") + + report.set_table_title("Test Basic Information") report.build_table_title() report.test_setup_table(value="Information", test_setup_data=test_input_infor) report.build_footer() report.write_html() report.write_pdf_with_timestamp(_page_size='A4', _orientation='Portrait') return report_path + except Exception as e: print(str(e)) logging.info(str(e)) From 585f8f10856173d24c4819687e58ea1cf181939d Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Wed, 6 May 2026 22:46:01 -0700 Subject: [PATCH 07/14] lf_roam_test.py: Remove default-on powersave enable Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index f2a5d6c30..272d755a5 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -373,7 +373,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N end_id_=num_sta - 1, padding_number_=10000, radio=radio) if self.sta_type == "normal": - station_profile.set_command_flag("add_sta", "power_save_enable", 1) if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: @@ -408,7 +407,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if self.option == "otds": print("OTDS present") station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) - station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-PSK ", pairwise="", group="", @@ -444,7 +442,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) - station_profile.set_command_flag("add_sta", "power_save_enable", 1) station_profile.set_wifi_extra(key_mgmt="FT-SAE ", pairwise="", group="", @@ -481,7 +478,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) # station_profile.set_command_flag("add_sta", "disable_roam", 1) - station_profile.set_command_flag("add_sta", "power_save_enable", 1) # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") station_profile.set_wifi_extra(key_mgmt="FT-EAP ", pairwise="[BLANK]", From 0a2feaa02243534eaed592a02cdc591826aa7ced Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 12:15:44 -0700 Subject: [PATCH 08/14] lf_roam_test.py: Remove redundant station cleanup Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 272d755a5..17d9ff021 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -356,17 +356,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N radio = self.twog_radios if self.band == "sixg": radio = self.sixg_radios - sta_list = self.get_station_list() - print("Available list of stations on lanforge-GUI :", sta_list) - logging.info(str(sta_list)) - if not sta_list: - print("No stations are available on lanforge-GUI") - logging.info("No stations are available on lanforge-GUI") - else: - station_profile.cleanup(sta_list, delay=1) - LFUtils.wait_until_ports_disappear(base_url=local_realm.lfclient_url, - port_list=sta_list, - debug=True) + print("Creating stations.") logging.info("Creating stations.") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, From dd0c4366704222046926b898b0734f13a591f4f0 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 13:20:16 -0700 Subject: [PATCH 09/14] lf_roam_test.py: clean up start/end time calculation/logging This also fixes a pretty loud deprecation warning related to not specifying the year. Add the year to to time format to fix Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 17d9ff021..769ab9cc1 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -148,6 +148,9 @@ multicast_profile = importlib.import_module("py-json.multicast_profile") +TIME_FORMAT = '%y %b %d %H:%M:%S' + + class HardRoam(Realm): def __init__(self, lanforge_ip=None, lanforge_port=None, @@ -727,16 +730,12 @@ def run(self, file_n=None): message = None, None # Start Timer - test_time = datetime.now() - test_time = test_time.strftime("%b %d %H:%M:%S") - print("Test started at ", test_time) - logging.info("Test started at " + str(test_time)) - self.start_time = test_time + self.start_time = datetime.now().strftime(TIME_FORMAT) + logger.debug(f"Test start time: {self.start_time}") # Getting two BSSID's for roam self.final_bssid.extend([self.c1_bssid, self.c2_bssid]) - print("Final BSSID's are :", self.final_bssid) - logging.info("Final BSSID's are :" + str(self.final_bssid)) + logger.info(f"Roaming target BSSIDs: {self.final_bssid}") # If 'Soft Roam' is selected, initially set the attenuator to zero. if self.soft_roam: @@ -1777,15 +1776,12 @@ def run(self, file_n=None): else: print("Stations failed to get ip") logging.info("Stations failed to get ip") - test_end = datetime.now() - test_end = test_end.strftime("%b %d %H:%M:%S") - print("Test Ended At ", test_end) - logging.info("Test Ended At " + str(test_end)) - self.end_time = test_end - s1 = test_time - s2 = test_end # for example - fmt = '%b %d %H:%M:%S' - self.test_duration = datetime.strptime(s2, fmt) - datetime.strptime(s1, fmt) + + self.end_time = datetime.now().strftime(TIME_FORMAT) + self.test_duration = datetime.strptime(self.end_time, TIME_FORMAT) - datetime.strptime(self.start_time, TIME_FORMAT) + + logger.info("Test complete") + return kernel_log, message # except Exception as e: From 19511ad0d4130f8cabf13309291d29cedf182369 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 13:34:17 -0700 Subject: [PATCH 10/14] lf_roam_test.py: Change all logging calls to logger Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 320 ++++++++++++++++++------------------- 1 file changed, 160 insertions(+), 160 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 769ab9cc1..b05ec1948 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -361,7 +361,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N radio = self.sixg_radios print("Creating stations.") - logging.info("Creating stations.") + logger.info("Creating stations.") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, end_id_=num_sta - 1, padding_number_=10000, radio=radio) @@ -370,7 +370,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: print("Soft roam true") - logging.info("Soft roam true") + logger.info("Soft roam true") if self.option == "otds": print("OTDS present") station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) @@ -396,7 +396,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: print("Soft roam true") - logging.info("Soft roam true") + logger.info("Soft roam true") if self.option == "otds": print("OTDS present") station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) @@ -499,7 +499,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N anqp_3gpp_cell_net="NA") station_profile.create(radio=radio, sta_names_=station_list) print("Waiting for ports to appear") - logging.info("Waiting for ports to appear") + logger.info("Waiting for ports to appear") local_realm.wait_until_ports_appear(sta_list=station_list) if self.soft_roam: @@ -516,21 +516,21 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N } print(bgscan) - logging.info(str(bgscan)) + logger.info(str(bgscan)) self.local_realm.json_post("/cli-json/set_wifi_custom", bgscan) # time.sleep(2) station_profile.admin_up() print("Waiting for ports to admin up") - logging.info("Waiting for ports to admin up") + logger.info("Waiting for ports to admin up") if local_realm.wait_for_ip(station_list): print("All stations got IPs") - logging.info("All stations got IPs") + logger.info("All stations got IPs") # exit() return True else: print("Stations failed to get IPs") - logging.info("Stations failed to get IPs") + logger.info("Stations failed to get IPs") return False # create a multicast profile @@ -554,10 +554,10 @@ def mcast_stop(self): def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu, side_b_min_pdu, traffic_type, sta_list): print("Station List :", sta_list) - logging.info("Station List : ", str(sta_list)) + logger.info("Station List : ", str(sta_list)) print(type(sta_list)) print("Upstream port :", self.upstream) - logging.info(str(self.upstream)) + logger.info(str(self.upstream)) self.cx_profile.host = self.lanforge_ip self.cx_profile.port = self.lanforge_port self.cx_profile.side_a_min_bps = side_a_min_rate @@ -568,7 +568,7 @@ def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_ self.cx_profile.side_b_min_pdu = side_b_min_pdu, # Create layer3 end points & run traffic print("Creating Endpoints") - logging.info("Creating Endpoints") + logger.info("Creating Endpoints") self.cx_profile.create(endp_type=traffic_type, side_a=sta_list, side_b=self.upstream, sleep_time=0) self.cx_profile.start_cx() @@ -715,16 +715,16 @@ def attenuator_modify(self, serno, idx, val): def run(self, file_n=None): try: print("Setting both attenuators to zero attenuation at the beginning.") - logging.info("Setting both attenuators to zero attenuation at the beginning.") + logger.info("Setting both attenuators to zero attenuation at the beginning.") ser_no = self.attenuator_serial() print("Available attenuators :", ser_no[0], ser_no[1]) - logging.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) + logger.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 0) except Exception as e: - logging.warning(str(e)) + logger.warning(str(e)) finally: kernel_log = [] message = None, None @@ -740,10 +740,10 @@ def run(self, file_n=None): # If 'Soft Roam' is selected, initially set the attenuator to zero. if self.soft_roam: print("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") - logging.info("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") + logger.info("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") ser_no = self.attenuator_serial() print("Available attenuators :", ser_no[0], ser_no[1]) - logging.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) + logger.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] self.attenuator_modify(ser_1, "all", 0) @@ -751,7 +751,7 @@ def run(self, file_n=None): # Start sniffer & Create clients with respect to bands print("Begin sniffing to establish the initial connection.") - logging.info("Begin sniffing to establish the initial connection.") + logger.info("Begin sniffing to establish the initial connection.") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str(self.option) + "start" + "_", duration=3600) @@ -771,10 +771,10 @@ def run(self, file_n=None): # Check if all stations have ip or not sta_list = self.get_station_list() print("Checking for IP and station list :", sta_list) - logging.info("Checking for IP and station list :" + str(sta_list)) + logger.info("Checking for IP and station list :" + str(sta_list)) if sta_list == "no response": print("No response from station") - logging.info("No response from station") + logger.info("No response from station") else: # if all stations got ip check mac address for stations val = self.wait_for_ip(sta_list) mac_list = [] @@ -783,7 +783,7 @@ def run(self, file_n=None): mac = self.station_data_query(station_name=str(sta), query="mac") mac_list.append(mac) print("List of MAC addresses for all stations :", mac_list) - logging.info("List of MAC addresses for all stations :" + str(mac_list)) + logger.info("List of MAC addresses for all stations :" + str(mac_list)) self.mac_data = mac_list # if self.debug: # print("start debug") @@ -791,11 +791,11 @@ def run(self, file_n=None): # print("check for 30 min") # time.sleep(1800) print("Stop Sniffer") - logging.info("Stop Sniffer") + logger.info("Stop Sniffer") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) print("pcap file name :", file_name) - logging.info("pcap file name : " + str(file_name)) + logger.info("pcap file name : " + str(file_name)) # if self.debug: # print("stop debugger") # self.stop_debug_(mac_list=mac_list) @@ -804,24 +804,24 @@ def run(self, file_n=None): if val: # if all station got an ip, then check all station are connected to single AP print("All stations got ip") - logging.info("All stations got ip") + logger.info("All stations got ip") print("Check if all stations are connected single ap") - logging.info("Check if all stations are connected single ap") + logger.info("Check if all stations are connected single ap") # get BSSID'S of all stations print("Get BSSID's of all stations") - logging.info("Get BSSID's of all stations") + logger.info("Get BSSID's of all stations") check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") - logging.info(str(bssid)) + logger.info(str(bssid)) check.append(bssid) print("BSSID of the current connected stations : ", check) - logging.info(str(check)) + logger.info(str(check)) # Check if all the stations in the BSSID list have the same BSSID. print("Verifying whether all BSSID's are identical or not.") - logging.info("Verifying whether all BSSID's are identical or not.") + logger.info("Verifying whether all BSSID's are identical or not.") result = all(element == check[0] for element in check) # if all BSSID's are identical / same, run layer3 traffic b/w station to upstream @@ -839,14 +839,14 @@ def run(self, file_n=None): # if BSSID's are not identical / same, try to move all clients to one ap print("Attempt to ensure that all clients are connected to the same AP before " "initiating a roaming process.") - logging.info("Attempt to ensure that all clients are connected to the same AP before " - "initiating a roaming process.") + logger.info("Attempt to ensure that all clients are connected to the same AP before " + "initiating a roaming process.") count1 = check.count(self.c1_bssid.upper()) count2 = check.count(self.c2_bssid.upper()) checker, new_sta_list, checker2 = None, [], None if count1 > count2: print("Station connected mostly to ap1") - logging.info("Station connected mostly to ap1") + logger.info("Station connected mostly to ap1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() else: @@ -854,11 +854,11 @@ def run(self, file_n=None): checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(check) if x == checker] print(index_count) - logging.info(str(index_count)) + logger.info(str(index_count)) for i in index_count: new_sta_list.append(sta_list[i]) print("new_sta_list", new_sta_list) - logging.info("new_sta_list " + str(new_sta_list)) + logger.info("new_sta_list " + str(new_sta_list)) for sta_name in new_sta_list: eid = self.name_to_eid(sta_name) @@ -866,7 +866,7 @@ def run(self, file_n=None): # sta = sta_name.split(".")[2] # TODO: use name-to-eid sta = eid[2] print(sta) - logging.info(sta) + logger.info(sta) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { "shelf": eid[0], @@ -881,7 +881,7 @@ def run(self, file_n=None): "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) - logging.info(str(wifi_cli_cmd_data)) + logger.info(str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: LANforge sta on same radio will share scan results, so you only need to scan on one STA per # radio, and then sleep should be 5 seconds, then roam every station that needs to roam. @@ -915,7 +915,7 @@ def run(self, file_n=None): print("wait for 5 mins for next roam process") time.sleep(120) print("Value of the Variable : ", variable) - logging.info("Value of the Variable :" + str(variable)) + logger.info("Value of the Variable :" + str(variable)) iterations, number, ser_1, ser_2 = self.iteration, None, None, None if variable != -1: iterations = iterable_var - variable @@ -929,47 +929,47 @@ def run(self, file_n=None): # Get the serial number of attenuators from lf ser_no = self.attenuator_serial() print(ser_no[0]) - logging.info(str(ser_no[0])) + logger.info(str(ser_no[0])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] if self.soft_roam: if iterations % 2 == 0: print("even set c1 to lowest and c2 to highest attenuation ") - logging.info("even set c1 to lowest and c2 to highest attenuation ") + logger.info("even set c1 to lowest and c2 to highest attenuation ") number = "even" print("number", number) - logging.info("number " + str(number)) + logger.info("number " + str(number)) # set attenuation to zero in first attenuator and high in second attenuator self.attenuator_modify(ser_1, "all", 700) self.attenuator_modify(ser_2, "all", 0) else: print("odd, c1 is already at highest and c2 is at lowest") - logging.info("odd, c1 is already at highest and c2 is at lowest") + logger.info("odd, c1 is already at highest and c2 is at lowest") self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 700) # 700 == 300/400 bgscan 15:-70:300 number = "odd" print("number", number) - logging.info("number " + str(number)) + logger.info("number " + str(number)) try: # Define row list per iteration row_list = [] sta_list = self.get_station_list() print("Station list : ", sta_list) - logging.info("Station list :" + str(sta_list)) + logger.info("Station list :" + str(sta_list)) if sta_list == "no response": print("No response") - logging.info("No response") + logger.info("No response") pass else: station = self.wait_for_ip(sta_list) if self.debug: print("Start debug") - logging.info("Start debug") + logger.info("Start debug") self.start_debug_(mac_list=mac_list) if station: print("All stations got ip") - logging.info("All stations got ip") + logger.info("All stations got ip") # Get bssid's of all stations currently connected bssid_list = [] for sta_name in sta_list: @@ -977,7 +977,7 @@ def run(self, file_n=None): bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list.append(bssid) print("BSSID of the current connected stations : ", bssid_list) - logging.info(str(bssid_list)) + logger.info(str(bssid_list)) pass_fail_list = [] pcap_file_list = [] roam_time1 = [] @@ -991,19 +991,19 @@ def run(self, file_n=None): if not result: # Attempt to connect the client to the same AP for each iteration print("Giving a try to connect") - logging.info("Giving a try to connect") + logger.info("Giving a try to connect") print("Move all clients to one AP") - logging.info("Move all clients to one AP") + logger.info("Move all clients to one AP") count3 = bssid_list.count(self.c1_bssid.upper()) count4 = bssid_list.count(self.c2_bssid.upper()) print("Count3", count3) - logging.info("Count3 " + str(count3)) + logger.info("Count3 " + str(count3)) print("Count4", count4) - logging.info("Count4 " + str(count4)) + logger.info("Count4 " + str(count4)) checker, new_sta_list, checker2 = None, [], None if count3 > count4: print("Station connected mostly to AP-1") - logging.info("Station connected mostly to AP-1") + logger.info("Station connected mostly to AP-1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() else: @@ -1011,11 +1011,11 @@ def run(self, file_n=None): checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(bssid_list) if x == checker] print(index_count) - logging.info(str(index_count)) + logger.info(str(index_count)) for i in index_count: new_sta_list.append(sta_list[i]) print("new_sta_list", new_sta_list) - logging.info("new_sta_list " + str(new_sta_list)) + logger.info("new_sta_list " + str(new_sta_list)) # for i, x in zip(bssid_list, sta_list): # if i == checker: # index_count = bssid_list.index(checker) @@ -1027,7 +1027,7 @@ def run(self, file_n=None): eid = self.name_to_eid(sta_name) sta = eid[2] print(sta) - logging.info(str(sta)) + logger.info(str(sta)) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { @@ -1043,7 +1043,7 @@ def run(self, file_n=None): "wpa_cli_cmd": wpa_cmd } print(wifi_cli_cmd_data) - logging.info(str(wifi_cli_cmd_data)) + logger.info(str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) @@ -1052,45 +1052,45 @@ def run(self, file_n=None): for sta_name in sta_list: sta = sta_name.split(".")[2] before_bss = self.station_data_query(station_name=str(sta), query="ap") - logging.info(str(before_bss)) + logger.info(str(before_bss)) before_bssid.append(before_bss) print("BSSID of the current connected stations : ", before_bssid) - logging.info("BSSID of the current connected stations : " + str(before_bssid)) + logger.info("BSSID of the current connected stations : " + str(before_bssid)) if before_bssid[0] == str(self.c1_bssid.upper()): post_bssid = self.c2_bssid.upper() else: post_bssid = self.c1_bssid.upper() print("After roaming, the stations will connect to %s the BSSID" % post_bssid) - logging.info( + logger.info( "After roaming, the stations will connect to " + str(post_bssid) + "the BSSID") result1 = all(element == before_bssid[0] for element in before_bssid) if result1: print("All stations connected to same AP") - logging.info("All stations connected to same AP") + logger.info("All stations connected to same AP") for i in before_bssid: local_row_list = [str(iterations + 1), i] - logging.info(str(local_row_list)) + logger.info(str(local_row_list)) row_list.append(local_row_list) print("Row list :", row_list) - logging.info(str(row_list)) + logger.info(str(row_list)) # if all bssid are equal then do check to which ap it is connected formated_bssid = before_bssid[0].lower() station_before = "" if formated_bssid == self.c1_bssid: print("Station connected to chamber1 AP") - logging.info("Station connected to chamber1 AP") + logger.info("Station connected to chamber1 AP") station_before = formated_bssid elif formated_bssid == self.c2_bssid: print("Station connected to chamber 2 AP") - logging.info("Station connected to chamber 2 AP") + logger.info("Station connected to chamber 2 AP") station_before = formated_bssid print("Current connected stations BSSID", station_before) - logging.info(str(station_before)) + logger.info(str(station_before)) # After checking all conditions start roam and start snifffer print("Starting sniffer") - logging.info("Starting sniffer") + logger.info("Starting sniffer") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str( self.option) + "_iteration_" + str( @@ -1102,25 +1102,25 @@ def run(self, file_n=None): ser_num = ser_1 ser_num2 = ser_2 print("even", ser_num) - logging.info("even " + str(ser_num)) + logger.info("even " + str(ser_num)) elif number == "odd": ser_num = ser_2 ser_num2 = ser_1 print("odd", ser_num) - logging.info("odd " + str(ser_num)) + logger.info("odd " + str(ser_num)) # logic to decrease c2 attenuation till 10 db using 1dbm steps status = None print("checking attenuation") - logging.info("checking attenuation") + logger.info("checking attenuation") print("ser num", ser_num) - logging.info("ser num " + str(ser_num)) + logger.info("ser num " + str(ser_num)) for atten_val2 in range(700, -10, -10): print(atten_val2) self.attenuator_modify(int(ser_num), "all", atten_val2) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. print("wait for 4 secs") - logging.info("wait for 4 secs") + logger.info("wait for 4 secs") # query bssid's of all stations bssid_check = [] for sta_name in sta_list: @@ -1130,7 +1130,7 @@ def run(self, file_n=None): # time.sleep(10) bssid_check.append(bssid) print(bssid_check) - logging.info(str(bssid_check)) + logger.info(str(bssid_check)) # check if all are equal resulta = all(element == bssid_check[0] for element in bssid_check) @@ -1139,29 +1139,29 @@ def run(self, file_n=None): if station_after == "N/A" or station_after == "na": status = "station did not roamed" print("station did not roamed") - logging.info("station did not roamed") + logger.info("station did not roamed") continue if station_after == station_before: status = "station did not roamed" print("station did not roamed") - logging.info("station did not roamed") + logger.info("station did not roamed") continue elif station_after != station_before: print("client performed roam") - logging.info("client performed roam") + logger.info("client performed roam") break if status == "station did not roamed": # set c1 to high for atten_val1 in (range(0, 700, 10)): print(atten_val1) - logging.info(str(atten_val1)) + logger.info(str(atten_val1)) self.attenuator_modify(int(ser_num2), "all", atten_val1) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. # TODO: Add attenuation step to logs to make it more obvious what script is doing. print("wait for 4 secs") - logging.info("wait for 4 secs") + logger.info("wait for 4 secs") bssid_check2 = [] for sta_name in sta_list: sta = sta_name.split(".")[2] @@ -1171,7 +1171,7 @@ def run(self, file_n=None): # time.sleep(10) bssid_check2.append(bssid) print(bssid_check2) - logging.info(str(bssid_check2)) + logger.info(str(bssid_check2)) # check if all are equal result = all(element == bssid_check2[0] for element in bssid_check2) if result: @@ -1179,25 +1179,25 @@ def run(self, file_n=None): if station_after == "N/A" or station_after == "na": # status = "station did not roamed" print("station did not roamed") - logging.info("station did not roamed") + logger.info("station did not roamed") continue if station_after == station_before: # status = "station did not roamed" print("station did not roamed") - logging.info("station did not roamed") + logger.info("station did not roamed") continue else: print('station roamed') - logging.info('station roamed') + logger.info('station roamed') break else: if station_before == self.final_bssid[0]: print("Connected stations bssid is same to bssid list first element") - logging.info( + logger.info( "Connected stations bssid is same to bssid list first element") for sta_name in sta_list: sta = sta_name.split(".")[2] - logging.info(str(sta)) + logger.info(str(sta)) wpa_cmd = "" if self.option == "ota": wpa_cmd = "roam " + str(self.final_bssid[1]) @@ -1217,7 +1217,7 @@ def run(self, file_n=None): "wpa_cli_cmd": wpa_cmd } print("Roam Command : ", wifi_cli_cmd_data) - logging.info("Roam Command : " + str(wifi_cli_cmd_data)) + logger.info("Roam Command : " + str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: See note in similar code above about only needing to scan once per radio @@ -1225,7 +1225,7 @@ def run(self, file_n=None): wifi_cli_cmd_data) else: print("Connected stations bssid is same to bssid list second element") - logging.info( + logger.info( "Connected stations bssid is same to bssid list second element") for sta_name in sta_list: sta = sta_name.split(".")[2] @@ -1234,7 +1234,7 @@ def run(self, file_n=None): wifi_cmd = "roam " + str(self.final_bssid[0]) if self.option == "otds": wifi_cmd = "ft_ds " + str(self.final_bssid[0]) - logging.info(str(sta)) + logger.info(str(sta)) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, @@ -1248,7 +1248,7 @@ def run(self, file_n=None): "wpa_cli_cmd": wifi_cmd } print("Roam Command : ", wifi_cli_cmd_data) - logging.info("Roam Command : " + str(wifi_cli_cmd_data)) + logger.info("Roam Command : " + str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: See note in similar code above about only needing to scan once per radio @@ -1261,18 +1261,18 @@ def run(self, file_n=None): kernel_log.append(i) # Stop sniff & Attach data print("Stop sniffer") - logging.info("Stop sniffer") + logger.info("Stop sniffer") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) print("pcap file name", file_name) - logging.info("pcap file name " + str(file_name)) + logger.info("pcap file name " + str(file_name)) if self.debug: print("Stop debugger") - logging.info("Stop debugger") + logger.info("Stop debugger") self.stop_debug_(mac_list=mac_list) else: print("Debug is disabled") - logging.info("Debug is disabled") + logger.info("Debug is disabled") self.wait_for_ip(sta_list) bssid_list_1 = [] for sta_name in sta_list: @@ -1280,11 +1280,11 @@ def run(self, file_n=None): bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list_1.append(bssid) print("The stations are romed to another AP (%s)" % bssid_list_1) - logging.info("The stations are romed to another AP " + str(bssid_list_1)) + logger.info("The stations are romed to another AP " + str(bssid_list_1)) for i, x in zip(row_list, bssid_list_1): i.append(x) print("Row list, after roam :", row_list) - logging.info("Row list, after roam :" + str(row_list)) + logger.info("Row list, after roam :" + str(row_list)) trace = self.get_file_name(client=self.num_sta) print("Trace file :", trace) log_file.append(trace) @@ -1295,25 +1295,25 @@ def run(self, file_n=None): res = "" station_before_ = before_bssid print("The BSSID of the station before roamed :", station_before_) - logging.info("The BSSID of the station before roamed : " + str(station_before_)) + logger.info("The BSSID of the station before roamed : " + str(station_before_)) # For each mac address query data from pcap for i, x in zip(mac_list, range(len(station_before_))): print("MAC address :", i) - logging.info("MAC address :" + str(i)) + logger.info("MAC address :" + str(i)) print("BSSID :", bssid_list_1) - logging.info(str(bssid_list_1)) + logger.info(str(bssid_list_1)) query_action_frame_time, auth_time = None, None station_after = bssid_list_1[x] print("The connected BSSID for stations, after rome :", station_after) - logging.info( + logger.info( "The connected BSSID for stations, after rome : " + str(station_after)) if station_after == station_before_[x] or station_after == "na": print("Station did not roamed") - logging.info("Station did not roamed") + logger.info("Station did not roamed") res = "FAIL" elif station_after != station_before_[x]: print("Client has performed a roaming operation.") - logging.info("Client has performed a roaming operation.") + logger.info("Client has performed a roaming operation.") res = "PASS" if res == "FAIL": res = "FAIL" @@ -1402,11 +1402,11 @@ def run(self, file_n=None): pyshark_filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print(query_reasso_response) - logging.info(str(query_reasso_response)) + logger.info(str(query_reasso_response)) if len(query_reasso_response) != 0 and query_reasso_response != "empty": if query_reasso_response == "Successful": print("Re-association status is successful") - logging.info("Re-association status is successful") + logger.info("Re-association status is successful") if self.sta_type == "normal": reasso_t = self.pcap_obj.read_time( pcap_file=str(file_name), @@ -1418,10 +1418,10 @@ def run(self, file_n=None): filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("Re-association time is", reasso_t) - logging.info("Re-association time is " + str(reasso_t)) + logger.info("Re-association time is " + str(reasso_t)) if self.option == "otds": print("Checking for Action Frame") - logging.info("Checking for Action Frame") + logger.info("Checking for Action Frame") # Action frame check query_action_frame = self.pcap_obj.check_frame_present( @@ -1431,14 +1431,14 @@ def run(self, file_n=None): print("Action Frame", query_action_frame) if len(query_action_frame) != 0 and query_action_frame != "empty": print("Action frame is present") - logging.info("Action frame is present") + logger.info("Action frame is present") query_action_frame_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) print("Action frame time is", query_action_frame_time) - logging.info( + logger.info( "Action frame time is " + str(reasso_t)) else: roam_time1.append("No Action frame") @@ -1446,10 +1446,10 @@ def run(self, file_n=None): pcap_file_list.append(str(file_name)) remark.append("No Action Frame") print("Row list :", row_list) - logging.info("Row list " + str(row_list)) + logger.info("Row list " + str(row_list)) else: print("Checking for Authentication Frame") - logging.info("Checking for Authentication Frame") + logger.info("Checking for Authentication Frame") if self.sta_type == "normal": query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), @@ -1465,7 +1465,7 @@ def run(self, file_n=None): if len(query_auth_response) != 0 and query_auth_response != "empty": if query_auth_response == "Successful": print("Authentication Request Frame is present") - logging.info( + logger.info( "Authentication Request Frame is present") if self.sta_type == "normal": auth_time = self.pcap_obj.read_time( @@ -1479,7 +1479,7 @@ def run(self, file_n=None): str(i))) print("Authentication Request Frame time is", auth_time) - logging.info( + logger.info( "Authentication Request Frame time is" + str( auth_time)) else: @@ -1493,14 +1493,14 @@ def run(self, file_n=None): pcap_file_list.append(str(file_name)) remark.append("No Auth frame") print("Row list :", row_list) - logging.info("row list " + str(row_list)) + logger.info("row list " + str(row_list)) # roam_time = None if self.option == "otds": roam_time = reasso_t - query_action_frame_time else: roam_time = reasso_t - auth_time print("Roam Time (ms)", roam_time) - logging.info("Roam Time (ms)" + str(roam_time)) + logger.info("Roam Time (ms)" + str(roam_time)) roam_time1.append(roam_time) if self.option == "ota": if roam_time < 50: @@ -1522,7 +1522,7 @@ def run(self, file_n=None): remark.append("Reassociation failure") print( "pcap_file name for fail instance of iteration value ") - logging.info( + logger.info( "pcap_file name for fail instance of iteration value ") else: roam_time1.append("No Reassociation") @@ -1530,26 +1530,26 @@ def run(self, file_n=None): pcap_file_list.append(str(file_name)) remark.append("No Reasso response") print("Row list : ", row_list) - logging.info("row list " + str(row_list)) + logger.info("row list " + str(row_list)) else: query_reasso_response = self.get_wlan_mgt_status( file_name=file_name, pyshark_filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) print("Query_reasso_response:", query_reasso_response) - logging.info(str(query_reasso_response)) + logger.info(str(query_reasso_response)) if len(query_reasso_response) != 0 and query_reasso_response != 'empty': if query_reasso_response == "Successful": print("Re-Association status is successful") - logging.info("Re-Association status is successful") + logger.info("Re-Association status is successful") reasso_t = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( # noqa: E501 str(i))) print("Re-Association time is", reasso_t) - logging.info("Re-Association time is " + str(reasso_t)) + logger.info("Re-Association time is " + str(reasso_t)) if self.option == "otds": print("Check for Action frame") - logging.info("Check for Action Frame") + logger.info("Check for Action Frame") # action frame check query_action_frame = self.pcap_obj.check_frame_present( @@ -1558,14 +1558,14 @@ def run(self, file_n=None): str(i))) if len(query_action_frame) != 0 and query_action_frame != "empty": print("Action Frame is present") - logging.info("Action Frame is present") + logger.info("Action Frame is present") query_action_frame_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) print("Action Frame time is", query_action_frame_time) - logging.info( + logger.info( "Action Frame) time is " + str(reasso_t)) else: roam_time1.append("No Action frame") @@ -1573,10 +1573,10 @@ def run(self, file_n=None): pcap_file_list.append(str(file_name)) remark.append("bssid miNo Action Frame") print("Row list :", row_list) - logging.info("Row list :" + str(row_list)) + logger.info("Row list :" + str(row_list)) else: print("Check for Authentication Frame") - logging.info("Check for Authentication Frame") + logger.info("Check for Authentication Frame") query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( @@ -1584,14 +1584,14 @@ def run(self, file_n=None): if len(query_auth_response) != 0 and query_auth_response != "empty": if query_auth_response == "Successful": print("Authentication Request is present") - logging.info( + logger.info( "Authentication Request is present") auth_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( # noqa: E501 str(i))) print("Authentication time is", auth_time) - logging.info( + logger.info( "Authentication time is " + str(auth_time)) else: roam_time1.append('Auth Fail') @@ -1604,14 +1604,14 @@ def run(self, file_n=None): pcap_file_list.append(str(file_name)) remark.append("bssid mismatched No Auth frame") print("Row list :", row_list) - logging.info("Row list :" + str(row_list)) + logger.info("Row list :" + str(row_list)) # roam_time = None if self.option == "otds": roam_time = reasso_t - query_action_frame_time else: roam_time = reasso_t - auth_time print("Roam time (ms)", roam_time) - logging.info("Roam time (ms) " + str(roam_time)) + logger.info("Roam time (ms) " + str(roam_time)) roam_time1.append(roam_time) if self.option == "ota": if roam_time < 50: @@ -1639,7 +1639,7 @@ def run(self, file_n=None): pcap_file_list.append(str(file_name)) remark.append("BSSID mismatched , No Reasso response") print("Row list :", row_list) - logging.info("row list " + str(row_list)) + logger.info("row list " + str(row_list)) if self.multicast == "True": print(row_list) print(pass_fail_list) @@ -1656,42 +1656,42 @@ def run(self, file_n=None): for i, x in zip(row_list, roam_time1): i.append(x) print("Row list :", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) # for i, x in zip(row_list, packet_loss_lst): # i.append(x) for i, x in zip(row_list, pass_fail_list): i.append(x) print("Row list :", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) for i, x in zip(row_list, pcap_file_list): i.append(x) print("Log file :", log_file) - logging.info("Log file : " + str(log_file)) + logger.info("Log file : " + str(log_file)) my_unnested_list = list(chain(*log_file)) print(my_unnested_list) - logging.info(str(my_unnested_list)) + logger.info(str(my_unnested_list)) for i, x in zip(row_list, my_unnested_list): i.append(x) print("Row list :", row_list) for i, x in zip(row_list, remark): i.append(x) print("Row list :", row_list) - logging.info("row list " + str(row_list)) + logger.info("row list " + str(row_list)) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "all stations are not connected to same ap for iteration " + str( iterations) print("All stations are not connected to same ap") - logging.info("All stations are not connected to same ap") + logger.info("All stations are not connected to same ap") print("Starting Sniffer") - logging.info("Starting Sniffer") + logger.info("Starting Sniffer") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str( self.option) + "_iteration_" + str( iterations) + "_", duration=3600) print("Stop Sniffer") - logging.info("Stop Sniffer") + logger.info("Stop Sniffer") self.stop_sniffer() kernel = self.journal_ctl_logs(file=str(iterations)) for i in kernel: @@ -1703,18 +1703,18 @@ def run(self, file_n=None): sta = sta_name.split(".")[2] before_bssid_ = self.station_data_query(station_name=str(sta), query="ap") print(before_bssid_) - logging.info(str(before_bssid_)) + logger.info(str(before_bssid_)) bssid_list2.append(before_bssid_) local_row_list.append(before_bssid_) print(local_row_list) - logging.info(str(local_row_list)) + logger.info(str(local_row_list)) row_list.append(local_row_list) print(row_list) - logging.info(str(row_list)) + logger.info(str(row_list)) for i, x in zip(row_list, bssid_list2): i.append(x) print("Row list :", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) if self.multicast == "True": for a in row_list: a.append("FAIL") @@ -1723,59 +1723,59 @@ def run(self, file_n=None): for i in row_list: i.append("No Roam Time") print("Row list :", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) for a in row_list: a.append("FAIL") print("Row list :", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) # pcap for i in row_list: i.append("N/A") print("Row list:", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) if self.debug: print("Stop Debugger") - logging.info("Stop Debugger") + logger.info("Stop Debugger") self.stop_debug_(mac_list=mac_list) else: print("Debug is disabled") - logging.info("Debug is disabled") + logger.info("Debug is disabled") trace = self.get_file_name(client=self.num_sta) log_file.append(trace) print("Log file :", log_file) - logging.info("Log file : " + str(log_file)) + logger.info("Log file : " + str(log_file)) my_unnested_list = list(chain(*log_file)) print(my_unnested_list) - logging.info(str(my_unnested_list)) + logger.info(str(my_unnested_list)) for i, x in zip(row_list, my_unnested_list): i.append(x) print("Row list:", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) for i in row_list: i.append("No roam performed all stations are not connected to same ap") print("Row list:", row_list) - logging.info("Row list : " + str(row_list)) + logger.info("Row list : " + str(row_list)) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "station's failed to get ip after the test start" print("Station's failed to get ip after test starts") - logging.info("Station's failed to get ip after test starts") + logger.info("Station's failed to get ip after test starts") if self.duration_based is True: if time.time() > timeout: break except Exception as e: # print(e) - logging.warning(str(e)) + logger.warning(str(e)) pass else: message = "station's failed to get ip at the beginning" print("##### Station's failed to get associate at the beginning") - logging.info("Station's failed to get associate at the beginning") + logger.info("Station's failed to get associate at the beginning") else: print("Stations failed to get ip") - logging.info("Stations failed to get ip") + logger.info("Stations failed to get ip") self.end_time = datetime.now().strftime(TIME_FORMAT) self.test_duration = datetime.strptime(self.end_time, TIME_FORMAT) - datetime.strptime(self.start_time, TIME_FORMAT) @@ -1785,38 +1785,38 @@ def run(self, file_n=None): return kernel_log, message # except Exception as e: - # logging.warning(str(e)) + # logger.warning(str(e)) # Graph generation function def generate_client_pass_fail_graph(self, csv_list=None): try: print("CSV list", csv_list) - logging.info("CSV list " + str(csv_list)) + logger.info("CSV list " + str(csv_list)) x_axis_category = [] for i in range(self.num_sta): x_axis_category.append(i + 1) print(x_axis_category) - logging.info(str(x_axis_category)) + logger.info(str(x_axis_category)) pass_list = [] fail_list = [] dataset = [] for i in csv_list: print("i", i) - logging.info("i, " + i) + logger.info("i, " + i) lf_csv_obj = lf_csv.lf_csv() h = lf_csv_obj.read_csv(file_name=i, column="PASS/FAIL") count = h.count("PASS") print(count) - logging.info(str(count)) + logger.info(str(count)) count_ = h.count("FAIL") print(count_) - logging.info(str(count_)) + logger.info(str(count_)) pass_list.append(count) fail_list.append(count_) dataset.append(pass_list) dataset.append(fail_list) print(dataset) - logging.info(str(dataset)) + logger.info(str(dataset)) # It will contain per station pass and fail number eg [[9, 7], [3, 4]] here 9, 7 are pass number for clients 3 and 4 are fail number # dataset = [[9, 7 , 4], [1, 3, 4]] graph = lf_graph.lf_bar_graph(_data_set=dataset, @@ -1832,10 +1832,10 @@ def generate_client_pass_fail_graph(self, csv_list=None): _remove_border=['top', 'right', 'left'], _alignment={"left": 0.011}, ) graph_png = graph.build_bar_graph() print("graph name {}".format(graph_png)) - logging.info(str("graph name {}".format(graph_png))) + logger.info(str("graph name {}".format(graph_png))) return graph_png except Exception as e: - logging.info(str(e)) + logger.info(str(e)) print(str(e)) def generate_report(self, csv_list, kernel_lst, current_path=None): @@ -2051,7 +2051,7 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): except Exception as e: print(str(e)) - logging.info(str(e)) + logger.info(str(e)) def parse_args(): From 510a4815b34ee4626a938d2b1e194af534dfc1e6 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 14:36:43 -0700 Subject: [PATCH 11/14] lf_roam_test.py: Remove unused or redundant printouts Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 198 ------------------------------------- 1 file changed, 198 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index b05ec1948..034328d0e 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -255,7 +255,6 @@ def __init__(self, lanforge_ip=None, self.mac_data = None self.soft_roam = soft_roam self.multicast = multicast - print("Number of iteration : ", self.iteration) # logging.basicConfig(filename='roam.log', filemode='w', level=logging.INFO, force=True) self.multi_cast_profile = multicast_profile.MULTICASTProfile(self.lanforge_ip, self.lanforge_port, local_realm=self) @@ -360,7 +359,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if self.band == "sixg": radio = self.sixg_radios - print("Creating stations.") logger.info("Creating stations.") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, end_id_=num_sta - 1, padding_number_=10000, @@ -369,7 +367,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if not self.soft_roam: station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: - print("Soft roam true") logger.info("Soft roam true") if self.option == "otds": print("OTDS present") @@ -395,7 +392,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N # station_profile.ssid_pass = self.security_key station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: - print("Soft roam true") logger.info("Soft roam true") if self.option == "otds": print("OTDS present") @@ -498,7 +494,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N network_auth_type="NA", anqp_3gpp_cell_net="NA") station_profile.create(radio=radio, sta_names_=station_list) - print("Waiting for ports to appear") logger.info("Waiting for ports to appear") local_realm.wait_until_ports_appear(sta_list=station_list) @@ -515,21 +510,17 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N "text": 'bgscan="simple:30:-65:300"' } - print(bgscan) logger.info(str(bgscan)) self.local_realm.json_post("/cli-json/set_wifi_custom", bgscan) # time.sleep(2) station_profile.admin_up() - print("Waiting for ports to admin up") logger.info("Waiting for ports to admin up") if local_realm.wait_for_ip(station_list): - print("All stations got IPs") logger.info("All stations got IPs") # exit() return True else: - print("Stations failed to get IPs") logger.info("Stations failed to get IPs") return False @@ -553,10 +544,7 @@ def mcast_stop(self): # Create layer-3 traffic on clients def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu, side_b_min_pdu, traffic_type, sta_list): - print("Station List :", sta_list) logger.info("Station List : ", str(sta_list)) - print(type(sta_list)) - print("Upstream port :", self.upstream) logger.info(str(self.upstream)) self.cx_profile.host = self.lanforge_ip self.cx_profile.port = self.lanforge_port @@ -567,7 +555,6 @@ def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_ self.cx_profile.side_a_min_pdu = side_a_min_pdu, self.cx_profile.side_b_min_pdu = side_b_min_pdu, # Create layer3 end points & run traffic - print("Creating Endpoints") logger.info("Creating Endpoints") self.cx_profile.create(endp_type=traffic_type, side_a=sta_list, side_b=self.upstream, sleep_time=0) self.cx_profile.start_cx() @@ -592,12 +579,10 @@ def get_endp_values(self, endp="A", cx_name="niki", query="tx bytes"): # self.json_get("http://192.168.100.131:8080/endp/Unsetwlan000-0-B?fields=rx%20rate") url = f"/endp/{cx_name}-{endp}?fields={query}" response = self.json_get(_req_url=url) - print(response) if (response is None) or ("endpoint" not in response): print("Incomplete response:") exit(1) final = response["endpoint"][query] - print(final) return final # Pre-Cleanup on lanforge @@ -611,9 +596,7 @@ def precleanup(self): # Get client data from lf def station_data_query(self, station_name="wlan0", query="channel"): url = f"/port/{1}/{1}/{station_name}?fields={query}" - # print("url//////", url) response = self.local_realm.json_get(_req_url=url) - print("Response: ", response) if (response is None) or ("interface" not in response): print("Station_list: incomplete response:") # pprint(response) @@ -714,10 +697,8 @@ def attenuator_modify(self, serno, idx, val): # This is where the main roaming functionality begins def run(self, file_n=None): try: - print("Setting both attenuators to zero attenuation at the beginning.") logger.info("Setting both attenuators to zero attenuation at the beginning.") ser_no = self.attenuator_serial() - print("Available attenuators :", ser_no[0], ser_no[1]) logger.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] @@ -739,10 +720,8 @@ def run(self, file_n=None): # If 'Soft Roam' is selected, initially set the attenuator to zero. if self.soft_roam: - print("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") logger.info("Setting both attenuators to zero attenuation at the beginning for 'soft roam'") ser_no = self.attenuator_serial() - print("Available attenuators :", ser_no[0], ser_no[1]) logger.info("Available attenuators :" + str(ser_no[0]) + " , " + str(ser_no[1])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] @@ -750,7 +729,6 @@ def run(self, file_n=None): self.attenuator_modify(ser_2, "all", 0) # Start sniffer & Create clients with respect to bands - print("Begin sniffing to establish the initial connection.") logger.info("Begin sniffing to establish the initial connection.") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str(self.option) + "start" + "_", @@ -770,10 +748,8 @@ def run(self, file_n=None): # Check if all stations have ip or not sta_list = self.get_station_list() - print("Checking for IP and station list :", sta_list) logger.info("Checking for IP and station list :" + str(sta_list)) if sta_list == "no response": - print("No response from station") logger.info("No response from station") else: # if all stations got ip check mac address for stations val = self.wait_for_ip(sta_list) @@ -782,33 +758,17 @@ def run(self, file_n=None): sta = sta_name.split(".")[2] # use name_to_eid mac = self.station_data_query(station_name=str(sta), query="mac") mac_list.append(mac) - print("List of MAC addresses for all stations :", mac_list) logger.info("List of MAC addresses for all stations :" + str(mac_list)) self.mac_data = mac_list - # if self.debug: - # print("start debug") - # self.start_debug_(mac_list=mac_list) - # print("check for 30 min") - # time.sleep(1800) - print("Stop Sniffer") logger.info("Stop Sniffer") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) - print("pcap file name :", file_name) logger.info("pcap file name : " + str(file_name)) - # if self.debug: - # print("stop debugger") - # self.stop_debug_(mac_list=mac_list) - # # time.sleep(40) - # exit() if val: # if all station got an ip, then check all station are connected to single AP - print("All stations got ip") logger.info("All stations got ip") - print("Check if all stations are connected single ap") logger.info("Check if all stations are connected single ap") # get BSSID'S of all stations - print("Get BSSID's of all stations") logger.info("Get BSSID's of all stations") check = [] for sta_name in sta_list: @@ -816,11 +776,9 @@ def run(self, file_n=None): bssid = self.station_data_query(station_name=str(sta), query="ap") logger.info(str(bssid)) check.append(bssid) - print("BSSID of the current connected stations : ", check) logger.info(str(check)) # Check if all the stations in the BSSID list have the same BSSID. - print("Verifying whether all BSSID's are identical or not.") logger.info("Verifying whether all BSSID's are identical or not.") result = all(element == check[0] for element in check) @@ -837,15 +795,12 @@ def run(self, file_n=None): side_a_min_pdu=1250, side_b_min_pdu=1250) else: # if BSSID's are not identical / same, try to move all clients to one ap - print("Attempt to ensure that all clients are connected to the same AP before " - "initiating a roaming process.") logger.info("Attempt to ensure that all clients are connected to the same AP before " "initiating a roaming process.") count1 = check.count(self.c1_bssid.upper()) count2 = check.count(self.c2_bssid.upper()) checker, new_sta_list, checker2 = None, [], None if count1 > count2: - print("Station connected mostly to ap1") logger.info("Station connected mostly to ap1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() @@ -853,19 +808,15 @@ def run(self, file_n=None): checker = self.c1_bssid.upper() checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(check) if x == checker] - print(index_count) logger.info(str(index_count)) for i in index_count: new_sta_list.append(sta_list[i]) - print("new_sta_list", new_sta_list) logger.info("new_sta_list " + str(new_sta_list)) for sta_name in new_sta_list: eid = self.name_to_eid(sta_name) - print("eid", eid) # sta = sta_name.split(".")[2] # TODO: use name-to-eid sta = eid[2] - print(sta) logger.info(sta) wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { @@ -880,7 +831,6 @@ def run(self, file_n=None): "port": str(sta), "wpa_cli_cmd": wpa_cmd } - print(wifi_cli_cmd_data) logger.info(str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) # TODO: LANforge sta on same radio will share scan results, so you only need to scan on one STA per @@ -914,7 +864,6 @@ def run(self, file_n=None): else: print("wait for 5 mins for next roam process") time.sleep(120) - print("Value of the Variable : ", variable) logger.info("Value of the Variable :" + str(variable)) iterations, number, ser_1, ser_2 = self.iteration, None, None, None if variable != -1: @@ -928,47 +877,38 @@ def run(self, file_n=None): break # Get the serial number of attenuators from lf ser_no = self.attenuator_serial() - print(ser_no[0]) logger.info(str(ser_no[0])) ser_1 = ser_no[0].split(".")[2] ser_2 = ser_no[1].split(".")[2] if self.soft_roam: if iterations % 2 == 0: - print("even set c1 to lowest and c2 to highest attenuation ") logger.info("even set c1 to lowest and c2 to highest attenuation ") number = "even" - print("number", number) logger.info("number " + str(number)) # set attenuation to zero in first attenuator and high in second attenuator self.attenuator_modify(ser_1, "all", 700) self.attenuator_modify(ser_2, "all", 0) else: - print("odd, c1 is already at highest and c2 is at lowest") logger.info("odd, c1 is already at highest and c2 is at lowest") self.attenuator_modify(ser_1, "all", 0) self.attenuator_modify(ser_2, "all", 700) # 700 == 300/400 bgscan 15:-70:300 number = "odd" - print("number", number) logger.info("number " + str(number)) try: # Define row list per iteration row_list = [] sta_list = self.get_station_list() - print("Station list : ", sta_list) logger.info("Station list :" + str(sta_list)) if sta_list == "no response": - print("No response") logger.info("No response") pass else: station = self.wait_for_ip(sta_list) if self.debug: - print("Start debug") logger.info("Start debug") self.start_debug_(mac_list=mac_list) if station: - print("All stations got ip") logger.info("All stations got ip") # Get bssid's of all stations currently connected bssid_list = [] @@ -990,19 +930,14 @@ def run(self, file_n=None): if not result: # Attempt to connect the client to the same AP for each iteration - print("Giving a try to connect") logger.info("Giving a try to connect") - print("Move all clients to one AP") logger.info("Move all clients to one AP") count3 = bssid_list.count(self.c1_bssid.upper()) count4 = bssid_list.count(self.c2_bssid.upper()) - print("Count3", count3) logger.info("Count3 " + str(count3)) - print("Count4", count4) logger.info("Count4 " + str(count4)) checker, new_sta_list, checker2 = None, [], None if count3 > count4: - print("Station connected mostly to AP-1") logger.info("Station connected mostly to AP-1") checker = self.c2_bssid.upper() checker2 = self.c1_bssid.upper() @@ -1010,23 +945,15 @@ def run(self, file_n=None): checker = self.c1_bssid.upper() checker2 = self.c2_bssid.upper() index_count = [i for i, x in enumerate(bssid_list) if x == checker] - print(index_count) logger.info(str(index_count)) for i in index_count: new_sta_list.append(sta_list[i]) - print("new_sta_list", new_sta_list) logger.info("new_sta_list " + str(new_sta_list)) - # for i, x in zip(bssid_list, sta_list): - # if i == checker: - # index_count = bssid_list.index(checker) - # new_sta_list.append(sta_list[index_count]) - # print("new_sta_list", new_sta_list) for sta_name in new_sta_list: # sta = sta_name.split(".")[2] eid = self.name_to_eid(sta_name) sta = eid[2] - print(sta) logger.info(str(sta)) wpa_cmd = "roam " + str(checker2) @@ -1042,7 +969,6 @@ def run(self, file_n=None): "port": str(sta), "wpa_cli_cmd": wpa_cmd } - print(wifi_cli_cmd_data) logger.info(str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) @@ -1054,42 +980,34 @@ def run(self, file_n=None): before_bss = self.station_data_query(station_name=str(sta), query="ap") logger.info(str(before_bss)) before_bssid.append(before_bss) - print("BSSID of the current connected stations : ", before_bssid) logger.info("BSSID of the current connected stations : " + str(before_bssid)) if before_bssid[0] == str(self.c1_bssid.upper()): post_bssid = self.c2_bssid.upper() else: post_bssid = self.c1_bssid.upper() - print("After roaming, the stations will connect to %s the BSSID" % post_bssid) logger.info( "After roaming, the stations will connect to " + str(post_bssid) + "the BSSID") result1 = all(element == before_bssid[0] for element in before_bssid) if result1: - print("All stations connected to same AP") logger.info("All stations connected to same AP") for i in before_bssid: local_row_list = [str(iterations + 1), i] logger.info(str(local_row_list)) row_list.append(local_row_list) - print("Row list :", row_list) logger.info(str(row_list)) # if all bssid are equal then do check to which ap it is connected formated_bssid = before_bssid[0].lower() station_before = "" if formated_bssid == self.c1_bssid: - print("Station connected to chamber1 AP") logger.info("Station connected to chamber1 AP") station_before = formated_bssid elif formated_bssid == self.c2_bssid: - print("Station connected to chamber 2 AP") logger.info("Station connected to chamber 2 AP") station_before = formated_bssid - print("Current connected stations BSSID", station_before) logger.info(str(station_before)) # After checking all conditions start roam and start snifffer - print("Starting sniffer") logger.info("Starting sniffer") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str( @@ -1101,25 +1019,19 @@ def run(self, file_n=None): if number == "even": ser_num = ser_1 ser_num2 = ser_2 - print("even", ser_num) logger.info("even " + str(ser_num)) elif number == "odd": ser_num = ser_2 ser_num2 = ser_1 - print("odd", ser_num) logger.info("odd " + str(ser_num)) # logic to decrease c2 attenuation till 10 db using 1dbm steps status = None - print("checking attenuation") logger.info("checking attenuation") - print("ser num", ser_num) logger.info("ser num " + str(ser_num)) for atten_val2 in range(700, -10, -10): - print(atten_val2) self.attenuator_modify(int(ser_num), "all", atten_val2) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. - print("wait for 4 secs") logger.info("wait for 4 secs") # query bssid's of all stations bssid_check = [] @@ -1129,7 +1041,6 @@ def run(self, file_n=None): # if bssid == "NA": # time.sleep(10) bssid_check.append(bssid) - print(bssid_check) logger.info(str(bssid_check)) # check if all are equal @@ -1138,29 +1049,24 @@ def run(self, file_n=None): station_after = bssid_check[0].lower() if station_after == "N/A" or station_after == "na": status = "station did not roamed" - print("station did not roamed") logger.info("station did not roamed") continue if station_after == station_before: status = "station did not roamed" - print("station did not roamed") logger.info("station did not roamed") continue elif station_after != station_before: - print("client performed roam") logger.info("client performed roam") break if status == "station did not roamed": # set c1 to high for atten_val1 in (range(0, 700, 10)): - print(atten_val1) logger.info(str(atten_val1)) self.attenuator_modify(int(ser_num2), "all", atten_val1) # TODO: You are changing in 1db increments. So, sleep for only 4 seconds # should be enough. # TODO: Add attenuation step to logs to make it more obvious what script is doing. - print("wait for 4 secs") logger.info("wait for 4 secs") bssid_check2 = [] for sta_name in sta_list: @@ -1170,7 +1076,6 @@ def run(self, file_n=None): # if bssid == "NA": # time.sleep(10) bssid_check2.append(bssid) - print(bssid_check2) logger.info(str(bssid_check2)) # check if all are equal result = all(element == bssid_check2[0] for element in bssid_check2) @@ -1178,21 +1083,17 @@ def run(self, file_n=None): station_after = bssid_check2[0].lower() if station_after == "N/A" or station_after == "na": # status = "station did not roamed" - print("station did not roamed") logger.info("station did not roamed") continue if station_after == station_before: # status = "station did not roamed" - print("station did not roamed") logger.info("station did not roamed") continue else: - print('station roamed') logger.info('station roamed') break else: if station_before == self.final_bssid[0]: - print("Connected stations bssid is same to bssid list first element") logger.info( "Connected stations bssid is same to bssid list first element") for sta_name in sta_list: @@ -1216,7 +1117,6 @@ def run(self, file_n=None): "port": str(sta), "wpa_cli_cmd": wpa_cmd } - print("Roam Command : ", wifi_cli_cmd_data) logger.info("Roam Command : " + str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) @@ -1224,7 +1124,6 @@ def run(self, file_n=None): self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) else: - print("Connected stations bssid is same to bssid list second element") logger.info( "Connected stations bssid is same to bssid list second element") for sta_name in sta_list: @@ -1247,7 +1146,6 @@ def run(self, file_n=None): "port": str(sta), "wpa_cli_cmd": wifi_cmd } - print("Roam Command : ", wifi_cli_cmd_data) logger.info("Roam Command : " + str(wifi_cli_cmd_data)) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) @@ -1260,18 +1158,14 @@ def run(self, file_n=None): for i in kernel: kernel_log.append(i) # Stop sniff & Attach data - print("Stop sniffer") logger.info("Stop sniffer") file_name_ = self.stop_sniffer() file_name = "./pcap/" + str(file_name_) - print("pcap file name", file_name) logger.info("pcap file name " + str(file_name)) if self.debug: - print("Stop debugger") logger.info("Stop debugger") self.stop_debug_(mac_list=mac_list) else: - print("Debug is disabled") logger.info("Debug is disabled") self.wait_for_ip(sta_list) bssid_list_1 = [] @@ -1279,11 +1173,9 @@ def run(self, file_n=None): sta = sta_name.split(".")[2] bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list_1.append(bssid) - print("The stations are romed to another AP (%s)" % bssid_list_1) logger.info("The stations are romed to another AP " + str(bssid_list_1)) for i, x in zip(row_list, bssid_list_1): i.append(x) - print("Row list, after roam :", row_list) logger.info("Row list, after roam :" + str(row_list)) trace = self.get_file_name(client=self.num_sta) print("Trace file :", trace) @@ -1294,25 +1186,19 @@ def run(self, file_n=None): all(element == bssid_list_1[0] for element in bssid_list_1) res = "" station_before_ = before_bssid - print("The BSSID of the station before roamed :", station_before_) logger.info("The BSSID of the station before roamed : " + str(station_before_)) # For each mac address query data from pcap for i, x in zip(mac_list, range(len(station_before_))): - print("MAC address :", i) logger.info("MAC address :" + str(i)) - print("BSSID :", bssid_list_1) logger.info(str(bssid_list_1)) query_action_frame_time, auth_time = None, None station_after = bssid_list_1[x] - print("The connected BSSID for stations, after rome :", station_after) logger.info( "The connected BSSID for stations, after rome : " + str(station_after)) if station_after == station_before_[x] or station_after == "na": - print("Station did not roamed") logger.info("Station did not roamed") res = "FAIL" elif station_after != station_before_[x]: - print("Client has performed a roaming operation.") logger.info("Client has performed a roaming operation.") res = "PASS" if res == "FAIL": @@ -1325,25 +1211,18 @@ def run(self, file_n=None): endp_list = self.json_get( "endp?fields=name,eid,rx rate (last)", debug_=False) - print("endpoint", endp_list) local_list, local_list1, final_list = [], [], [] if "endpoint" in endp_list: - print(endp_list["endpoint"]) - for i in range(1, len(endp_list["endpoint"])): local_list.append(endp_list['endpoint'][i]) - print(local_list) new_lst = [] for i in range(len(local_list)): local_list1 = list(local_list[i].keys()) new_lst.append(local_list1[0]) - print(local_list1) - print(new_lst) for i in range(len(new_lst)): final_list.append( endp_list['endpoint'][i + 1][new_lst[i]][ 'rx rate (last)']) - print(final_list) if 0 in final_list: print("try to start multicast few times") print("start multicast once again") @@ -1354,25 +1233,19 @@ def run(self, file_n=None): endp_list = self.json_get( "endp?fields=name,eid,rx rate (last)", debug_=False) - print("endpoint", endp_list) local_list, local_list1, final_list = [], [], [] if "endpoint" in endp_list: - print(endp_list["endpoint"]) for i in range(1, len(endp_list["endpoint"])): local_list.append(endp_list['endpoint'][i]) - print(local_list) new_lst = [] for i in range(len(local_list)): local_list1 = list(local_list[i].keys()) new_lst.append(local_list1[0]) - print(local_list1) - print(new_lst) for i in range(len(new_lst)): final_list.append( endp_list['endpoint'][i + 1][new_lst[i]][ 'rx rate (last)']) - print(final_list) if 0 in final_list: print("multicast did not resumed after few trials") pass_fail_list.append("FAIL") @@ -1386,7 +1259,6 @@ def run(self, file_n=None): pass_fail_list.append("PASS") remark.append("multicast resumed after roam") else: - print("roaming failed") pass_fail_list.append("FAIL") remark.append("bssid does not switched") else: @@ -1401,11 +1273,9 @@ def run(self, file_n=None): file_name=file_name, pyshark_filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) - print(query_reasso_response) logger.info(str(query_reasso_response)) if len(query_reasso_response) != 0 and query_reasso_response != "empty": if query_reasso_response == "Successful": - print("Re-association status is successful") logger.info("Re-association status is successful") if self.sta_type == "normal": reasso_t = self.pcap_obj.read_time( @@ -1417,10 +1287,8 @@ def run(self, file_n=None): pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( str(i))) - print("Re-association time is", reasso_t) logger.info("Re-association time is " + str(reasso_t)) if self.option == "otds": - print("Checking for Action Frame") logger.info("Checking for Action Frame") # Action frame check @@ -1430,14 +1298,11 @@ def run(self, file_n=None): str(i))) print("Action Frame", query_action_frame) if len(query_action_frame) != 0 and query_action_frame != "empty": - print("Action frame is present") logger.info("Action frame is present") query_action_frame_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) - print("Action frame time is", - query_action_frame_time) logger.info( "Action frame time is " + str(reasso_t)) else: @@ -1445,10 +1310,8 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("No Action Frame") - print("Row list :", row_list) logger.info("Row list " + str(row_list)) else: - print("Checking for Authentication Frame") logger.info("Checking for Authentication Frame") if self.sta_type == "normal": query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( @@ -1464,7 +1327,6 @@ def run(self, file_n=None): query_auth_response) if len(query_auth_response) != 0 and query_auth_response != "empty": if query_auth_response == "Successful": - print("Authentication Request Frame is present") logger.info( "Authentication Request Frame is present") if self.sta_type == "normal": @@ -1477,8 +1339,6 @@ def run(self, file_n=None): pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( # noqa: E501 str(i))) - print("Authentication Request Frame time is", - auth_time) logger.info( "Authentication Request Frame time is" + str( auth_time)) @@ -1492,14 +1352,12 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("No Auth frame") - print("Row list :", row_list) logger.info("row list " + str(row_list)) # roam_time = None if self.option == "otds": roam_time = reasso_t - query_action_frame_time else: roam_time = reasso_t - auth_time - print("Roam Time (ms)", roam_time) logger.info("Roam Time (ms)" + str(roam_time)) roam_time1.append(roam_time) if self.option == "ota": @@ -1520,8 +1378,6 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("Reassociation failure") - print( - "pcap_file name for fail instance of iteration value ") logger.info( "pcap_file name for fail instance of iteration value ") else: @@ -1529,7 +1385,6 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("No Reasso response") - print("Row list : ", row_list) logger.info("row list " + str(row_list)) else: query_reasso_response = self.get_wlan_mgt_status( @@ -1540,15 +1395,12 @@ def run(self, file_n=None): logger.info(str(query_reasso_response)) if len(query_reasso_response) != 0 and query_reasso_response != 'empty': if query_reasso_response == "Successful": - print("Re-Association status is successful") logger.info("Re-Association status is successful") reasso_t = self.pcap_obj.read_time(pcap_file=str(file_name), filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % ( # noqa: E501 str(i))) - print("Re-Association time is", reasso_t) logger.info("Re-Association time is " + str(reasso_t)) if self.option == "otds": - print("Check for Action frame") logger.info("Check for Action Frame") # action frame check @@ -1557,14 +1409,11 @@ def run(self, file_n=None): filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) if len(query_action_frame) != 0 and query_action_frame != "empty": - print("Action Frame is present") logger.info("Action Frame is present") query_action_frame_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.category_code == 6) && (wlan.sa == %s)" % ( str(i))) - print("Action Frame time is", - query_action_frame_time) logger.info( "Action Frame) time is " + str(reasso_t)) else: @@ -1572,10 +1421,8 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid miNo Action Frame") - print("Row list :", row_list) logger.info("Row list :" + str(row_list)) else: - print("Check for Authentication Frame") logger.info("Check for Authentication Frame") query_auth_response = self.pcap_obj.get_wlan_mgt_status_code( pcap_file=str(file_name), @@ -1583,14 +1430,12 @@ def run(self, file_n=None): str(i))) if len(query_auth_response) != 0 and query_auth_response != "empty": if query_auth_response == "Successful": - print("Authentication Request is present") logger.info( "Authentication Request is present") auth_time = self.pcap_obj.read_time( pcap_file=str(file_name), filter="(wlan.fixed.auth.alg == 2 && wlan.fixed.status_code == 0x0000 && wlan.fixed.auth_seq == 0x0001) && (wlan.sa == %s)" % ( # noqa: E501 str(i))) - print("Authentication time is", auth_time) logger.info( "Authentication time is " + str(auth_time)) else: @@ -1603,14 +1448,12 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("bssid mismatched No Auth frame") - print("Row list :", row_list) logger.info("Row list :" + str(row_list)) # roam_time = None if self.option == "otds": roam_time = reasso_t - query_action_frame_time else: roam_time = reasso_t - auth_time - print("Roam time (ms)", roam_time) logger.info("Roam time (ms) " + str(roam_time)) roam_time1.append(roam_time) if self.option == "ota": @@ -1638,59 +1481,45 @@ def run(self, file_n=None): pass_fail_list.append("FAIL") pcap_file_list.append(str(file_name)) remark.append("BSSID mismatched , No Reasso response") - print("Row list :", row_list) logger.info("row list " + str(row_list)) if self.multicast == "True": - print(row_list) - print(pass_fail_list) - print(remark) for i, x in zip(row_list, pass_fail_list): i.append(x) for i, x in zip(row_list, remark): i.append(x) - print("Row list :", row_list) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: for i, x in zip(row_list, roam_time1): i.append(x) - print("Row list :", row_list) logger.info("Row list : " + str(row_list)) # for i, x in zip(row_list, packet_loss_lst): # i.append(x) for i, x in zip(row_list, pass_fail_list): i.append(x) - print("Row list :", row_list) logger.info("Row list : " + str(row_list)) for i, x in zip(row_list, pcap_file_list): i.append(x) - print("Log file :", log_file) logger.info("Log file : " + str(log_file)) my_unnested_list = list(chain(*log_file)) - print(my_unnested_list) logger.info(str(my_unnested_list)) for i, x in zip(row_list, my_unnested_list): i.append(x) - print("Row list :", row_list) for i, x in zip(row_list, remark): i.append(x) - print("Row list :", row_list) logger.info("row list " + str(row_list)) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "all stations are not connected to same ap for iteration " + str( iterations) - print("All stations are not connected to same ap") logger.info("All stations are not connected to same ap") - print("Starting Sniffer") logger.info("Starting Sniffer") self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str( self.option) + "_iteration_" + str( iterations) + "_", duration=3600) - print("Stop Sniffer") logger.info("Stop Sniffer") self.stop_sniffer() kernel = self.journal_ctl_logs(file=str(iterations)) @@ -1702,79 +1531,61 @@ def run(self, file_n=None): local_row_list = [str(iterations)] sta = sta_name.split(".")[2] before_bssid_ = self.station_data_query(station_name=str(sta), query="ap") - print(before_bssid_) logger.info(str(before_bssid_)) bssid_list2.append(before_bssid_) local_row_list.append(before_bssid_) - print(local_row_list) logger.info(str(local_row_list)) row_list.append(local_row_list) - print(row_list) logger.info(str(row_list)) for i, x in zip(row_list, bssid_list2): i.append(x) - print("Row list :", row_list) logger.info("Row list : " + str(row_list)) if self.multicast == "True": for a in row_list: a.append("FAIL") - print("Row list :", row_list) else: for i in row_list: i.append("No Roam Time") - print("Row list :", row_list) logger.info("Row list : " + str(row_list)) for a in row_list: a.append("FAIL") - print("Row list :", row_list) logger.info("Row list : " + str(row_list)) # pcap for i in row_list: i.append("N/A") - print("Row list:", row_list) logger.info("Row list : " + str(row_list)) if self.debug: - print("Stop Debugger") logger.info("Stop Debugger") self.stop_debug_(mac_list=mac_list) else: - print("Debug is disabled") logger.info("Debug is disabled") trace = self.get_file_name(client=self.num_sta) log_file.append(trace) - print("Log file :", log_file) logger.info("Log file : " + str(log_file)) my_unnested_list = list(chain(*log_file)) - print(my_unnested_list) logger.info(str(my_unnested_list)) for i, x in zip(row_list, my_unnested_list): i.append(x) - print("Row list:", row_list) logger.info("Row list : " + str(row_list)) for i in row_list: i.append("No roam performed all stations are not connected to same ap") - print("Row list:", row_list) logger.info("Row list : " + str(row_list)) for i, x in zip(file_n, row_list): self.lf_csv_obj.open_csv_append(fields=x, name=i) else: message = "station's failed to get ip after the test start" - print("Station's failed to get ip after test starts") logger.info("Station's failed to get ip after test starts") if self.duration_based is True: if time.time() > timeout: break except Exception as e: - # print(e) logger.warning(str(e)) pass else: message = "station's failed to get ip at the beginning" - print("##### Station's failed to get associate at the beginning") logger.info("Station's failed to get associate at the beginning") else: - print("Stations failed to get ip") logger.info("Stations failed to get ip") self.end_time = datetime.now().strftime(TIME_FORMAT) @@ -1790,32 +1601,26 @@ def run(self, file_n=None): # Graph generation function def generate_client_pass_fail_graph(self, csv_list=None): try: - print("CSV list", csv_list) logger.info("CSV list " + str(csv_list)) x_axis_category = [] for i in range(self.num_sta): x_axis_category.append(i + 1) - print(x_axis_category) logger.info(str(x_axis_category)) pass_list = [] fail_list = [] dataset = [] for i in csv_list: - print("i", i) logger.info("i, " + i) lf_csv_obj = lf_csv.lf_csv() h = lf_csv_obj.read_csv(file_name=i, column="PASS/FAIL") count = h.count("PASS") - print(count) logger.info(str(count)) count_ = h.count("FAIL") - print(count_) logger.info(str(count_)) pass_list.append(count) fail_list.append(count_) dataset.append(pass_list) dataset.append(fail_list) - print(dataset) logger.info(str(dataset)) # It will contain per station pass and fail number eg [[9, 7], [3, 4]] here 9, 7 are pass number for clients 3 and 4 are fail number # dataset = [[9, 7 , 4], [1, 3, 4]] @@ -1831,12 +1636,10 @@ def generate_client_pass_fail_graph(self, csv_list=None): _legend_loc="upper right", _legend_fontsize=12, _legend_box=(1.12, 1.01), _remove_border=['top', 'right', 'left'], _alignment={"left": 0.011}, ) graph_png = graph.build_bar_graph() - print("graph name {}".format(graph_png)) logger.info(str("graph name {}".format(graph_png))) return graph_png except Exception as e: logger.info(str(e)) - print(str(e)) def generate_report(self, csv_list, kernel_lst, current_path=None): try: @@ -2050,7 +1853,6 @@ def generate_report(self, csv_list, kernel_lst, current_path=None): return report_path except Exception as e: - print(str(e)) logger.info(str(e)) From a822a7e15d24a6ca504c1aa36c083ddda98e7cb4 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 14:52:04 -0700 Subject: [PATCH 12/14] lf_roam_test.py: Create station commenting/whitespace, some consolidation Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 50 ++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 034328d0e..af7db5738 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -352,6 +352,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) station_profile = local_realm.new_station_profile() + if self.band == "fiveg": radio = self.fiveg_radios if self.band == "twog": @@ -359,37 +360,34 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if self.band == "sixg": radio = self.sixg_radios - logger.info("Creating stations.") + logger.info(f"Using LANforge radio(s) {radio} for test station(s)") station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, end_id_=num_sta - 1, padding_number_=10000, radio=radio) if self.sta_type == "normal": if not self.soft_roam: + logger.info("Soft roam disabled") station_profile.set_command_flag("add_sta", "disable_roam", 1) - if self.soft_roam: - logger.info("Soft roam true") + else: + logger.info("Soft roam enabled") if self.option == "otds": - print("OTDS present") + logger.info("Enabling 802.11r FT-DS") station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) if self.sta_type == "11r-sae-802.1x": dut_passwd = "[BLANK]" + + # Settings for all stations station_profile.use_security(dut_security, dut_ssid, dut_passwd) station_profile.set_number_template("00") - station_profile.set_command_flag("add_sta", "create_admin_down", 1) - station_profile.set_command_param("set_port", "report_timer", 1500) + station_profile.set_command_flag("set_port", "rpt_timer", 1) # TODO: Is this redundant? - # connect station to particular bssid - # self.station_profile.set_command_param("add_sta", "ap", self.bssid[0]) - - station_profile.set_command_flag("set_port", "rpt_timer", 1) if self.sta_type == "11r": station_profile.set_command_flag("add_sta", "80211u_enable", 0) station_profile.set_command_flag("add_sta", "8021x_radius", 1) if not self.soft_roam: - # station_profile.ssid_pass = self.security_key station_profile.set_command_flag("add_sta", "disable_roam", 1) if self.soft_roam: logger.info("Soft roam true") @@ -466,8 +464,6 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N if self.soft_roam: if self.option == "otds": station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) - # station_profile.set_command_flag("add_sta", "disable_roam", 1) - # station_profile.set_command_flag("add_sta", "ap", "68:7d:b4:5f:5c:3f") station_profile.set_wifi_extra(key_mgmt="FT-EAP ", pairwise="[BLANK]", group="[BLANK]", @@ -493,35 +489,43 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N ipaddr_type_avail="NA", network_auth_type="NA", anqp_3gpp_cell_net="NA") + + # Create stations + logger.info(f"Creating {num_sta} test stations configurated for test AP SSID {dut_ssid}") + logger.debug(f"Creating stations: {station_list}") station_profile.create(radio=radio, sta_names_=station_list) - logger.info("Waiting for ports to appear") + + logger.info("Waiting stations to initialize") local_realm.wait_until_ports_appear(sta_list=station_list) if self.soft_roam: + bgscan_config = 'bgscan="simple:30:-65:300"' + logger.info(f"Enabling background scanning for test stations: {bgscan_config}") + for sta_name in station_list: + logger.debug(f"Enabling background scanning for station {sta_name}") sta = sta_name.split(".")[2] # TODO: Use name_to_eid - # wpa_cmd = "roam " + str(checker2) - bgscan = { "shelf": 1, "resource": 1, # TODO: Do not hard-code resource, get it from radio eid I think. "port": str(sta), "type": 'NA', - "text": 'bgscan="simple:30:-65:300"' + "text": bgscan_config, } - - logger.info(str(bgscan)) self.local_realm.json_post("/cli-json/set_wifi_custom", bgscan) - # time.sleep(2) + # Bring up stations + logger.info("Setting stations to active state") station_profile.admin_up() - logger.info("Waiting for ports to admin up") + + # Wait for stations to connect + logger.info("Waiting for stations to connect (associate and configure IP)") if local_realm.wait_for_ip(station_list): - logger.info("All stations got IPs") + logger.info("All stations connected") # exit() return True else: - logger.info("Stations failed to get IPs") + logger.error(f"One or more stations did not connect to test AP SSID {dut_ssid}") return False # create a multicast profile From 10c30d105eb4ec8dafcf61bce5509f978e29cc05 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 15:41:00 -0700 Subject: [PATCH 13/14] lf_roam_test.py: Rework station creation logic Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 160 ++++++++++--------------------------- 1 file changed, 41 insertions(+), 119 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index af7db5738..07345910d 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -353,142 +353,64 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) station_profile = local_realm.new_station_profile() - if self.band == "fiveg": - radio = self.fiveg_radios if self.band == "twog": radio = self.twog_radios - if self.band == "sixg": + elif self.band == "fiveg": + radio = self.fiveg_radios + else: radio = self.sixg_radios logger.info(f"Using LANforge radio(s) {radio} for test station(s)") - station_list = LFUtils.portNameSeries(prefix_=sta_prefix, start_id_=start_id, - end_id_=num_sta - 1, padding_number_=10000, + station_list = LFUtils.portNameSeries(prefix_=sta_prefix, + start_id_=start_id, + end_id_=num_sta - 1, + padding_number_=10000, radio=radio) - if self.sta_type == "normal": - if not self.soft_roam: - logger.info("Soft roam disabled") - station_profile.set_command_flag("add_sta", "disable_roam", 1) - else: - logger.info("Soft roam enabled") - if self.option == "otds": - logger.info("Enabling 802.11r FT-DS") - station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) + if not self.soft_roam: + logger.info("Soft roam disabled") + station_profile.set_command_flag("add_sta", "disable_roam", 1) + else: + logger.info("Soft roam enabled") + + if self.option == "otds": + logger.info("Enabling 802.11r FT-DS") + station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) if self.sta_type == "11r-sae-802.1x": dut_passwd = "[BLANK]" # Settings for all stations - station_profile.use_security(dut_security, dut_ssid, dut_passwd) - station_profile.set_number_template("00") + station_profile.use_security(self.security, self.ssid_name, self.security_key) station_profile.set_command_flag("add_sta", "create_admin_down", 1) station_profile.set_command_param("set_port", "report_timer", 1500) - station_profile.set_command_flag("set_port", "rpt_timer", 1) # TODO: Is this redundant? + station_profile.set_command_flag("add_sta", "80211u_enable", 0) - if self.sta_type == "11r": - station_profile.set_command_flag("add_sta", "80211u_enable", 0) - station_profile.set_command_flag("add_sta", "8021x_radius", 1) - if not self.soft_roam: - station_profile.set_command_flag("add_sta", "disable_roam", 1) - if self.soft_roam: - logger.info("Soft roam true") - if self.option == "otds": - print("OTDS present") - station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) - station_profile.set_wifi_extra(key_mgmt="FT-PSK ", - pairwise="", - group="", - psk="", - eap="", - identity="", - passwd="", - pin="", - phase1="NA", - phase2="NA", - pac_file="NA", - private_key="NA", - pk_password="NA", - hessid="00:00:00:00:00:01", - realm="localhost.localdomain", - client_cert="NA", - imsi="NA", - milenage="NA", - domain="localhost.localdomain", - roaming_consortium="NA", - venue_group="NA", - network_type="NA", - ipaddr_type_avail="NA", - network_auth_type="NA", - anqp_3gpp_cell_net="NA") - if self.sta_type == "11r-sae": + # WPA3 requires PMF (802.11w) + if "sae" in self.sta_type: station_profile.set_command_flag("add_sta", "ieee80211w", 2) - station_profile.set_command_flag("add_sta", "80211u_enable", 0) - station_profile.set_command_flag("add_sta", "8021x_radius", 1) - # station_profile.set_command_flag("add_sta", "disable_roam", 1) - if not self.soft_roam: - station_profile.set_command_flag("add_sta", "disable_roam", 1) - if self.soft_roam: - if self.option == "otds": - station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) - station_profile.set_wifi_extra(key_mgmt="FT-SAE ", - pairwise="", - group="", - psk="", - eap="", - identity="", - passwd="", - pin="", - phase1="NA", - phase2="NA", - pac_file="NA", - private_key="NA", - pk_password="NA", - hessid="00:00:00:00:00:01", - realm="localhost.localdomain", - client_cert="NA", - imsi="NA", - milenage="NA", - domain="localhost.localdomain", - roaming_consortium="NA", - venue_group="NA", - network_type="NA", - ipaddr_type_avail="NA", - network_auth_type="NA", - anqp_3gpp_cell_net="NA") - if self.sta_type == "11r-sae-802.1x": - station_profile.set_command_flag("set_port", "rpt_timer", 1) - station_profile.set_command_flag("add_sta", "ieee80211w", 2) - station_profile.set_command_flag("add_sta", "80211u_enable", 0) - station_profile.set_command_flag("add_sta", "8021x_radius", 1) - if not self.soft_roam: - station_profile.set_command_flag("add_sta", "disable_roam", 1) - if self.soft_roam: - if self.option == "otds": - station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) - station_profile.set_wifi_extra(key_mgmt="FT-EAP ", - pairwise="[BLANK]", - group="[BLANK]", - psk="[BLANK]", + + if self.sta_type == "11r" or self.sta_type == "11r-sae": + if self.sta_type == "11r": + key_mgmt = "FT-PSK" + else: + key_mgmt = "FT-SAE" + + # Have to set 'Advanced/802.1X' flag in order for 'psk' argument to take. + # This works around limitation in the GUI which does a check for 'Key/Phrase' + # length when WPA/WPA2/WPA3 enabled (but that field is sadly also cleared here) + station_profile.set_wifi_extra(key_mgmt=key_mgmt, + psk=self.security_key) + station_profile.set_command_flag(command_name="add_sta", + param_name="8021x_radius", + value=1) # Enable Advanced/802.1X flag + elif self.sta_type == "11r-sae-802.1x": + station_profile.set_command_flag(command_name="add_sta", + param_name="8021x_radius", + value=1) # Enable Advanced/802.1X flag + station_profile.set_wifi_extra(key_mgmt="FT-EAP", eap="TTLS", identity=self.identity, - passwd=self.ttls_pass, - pin="", - phase1="NA", - phase2="NA", - pac_file="NA", - private_key="NA", - pk_password="NA", - hessid="00:00:00:00:00:01", - realm="localhost.localdomain", - client_cert="NA", - imsi="NA", - milenage="NA", - domain="localhost.localdomain", - roaming_consortium="NA", - venue_group="NA", - network_type="NA", - ipaddr_type_avail="NA", - network_auth_type="NA", - anqp_3gpp_cell_net="NA") + passwd=self.ttls_pass) # Create stations logger.info(f"Creating {num_sta} test stations configurated for test AP SSID {dut_ssid}") From 5b275587784772f38facbbefcfd87f569018af13 Mon Sep 17 00:00:00 2001 From: Alex Gavin Date: Thu, 7 May 2026 15:46:34 -0700 Subject: [PATCH 14/14] lf_roam_test.py: Rework station creation function args Verified CLI: ./lf_roam_test.py \ --iteration_based \ --ap1_bssid xx:xx:xx:xx:xx:xx \ --ap2_bssid xx:xx:xx:xx:xx:xx \ --ssid_name testssid \ --password password \ --security wpa2 \ --upstream 1.1.eth3 ./lf_roam_test.py \ --iteration_based \ --ap1_bssid xx:xx:xx:xx:xx:xx \ --ap2_bssid xx:xx:xx:xx:xx:xx \ --ssid_name testssid \ --password password \ --security wpa2 \ --upstream 1.1.eth3 \ --sta_type 11r Signed-off-by: Alex Gavin --- py-scripts/lf_roam_test.py | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/py-scripts/lf_roam_test.py b/py-scripts/lf_roam_test.py index 07345910d..2b5cf54f0 100755 --- a/py-scripts/lf_roam_test.py +++ b/py-scripts/lf_roam_test.py @@ -346,10 +346,7 @@ def get_station_list(self): sta_list.append(j) return sta_list - # Create N - number of clients of advanced configuration on lf - def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None, - dut_security=None, dut_passwd=None, radio=None): - + def create_stations(self, radio): local_realm = realm.Realm(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port) station_profile = local_realm.new_station_profile() @@ -361,9 +358,9 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N radio = self.sixg_radios logger.info(f"Using LANforge radio(s) {radio} for test station(s)") - station_list = LFUtils.portNameSeries(prefix_=sta_prefix, - start_id_=start_id, - end_id_=num_sta - 1, + station_list = LFUtils.portNameSeries(prefix_="sta", + start_id_=0, + end_id_=self.num_sta - 1, padding_number_=10000, radio=radio) if not self.soft_roam: @@ -377,7 +374,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N station_profile.set_command_flag("add_sta", "ft-roam-over-ds", 1) if self.sta_type == "11r-sae-802.1x": - dut_passwd = "[BLANK]" + self.security_key = "[BLANK]" # Settings for all stations station_profile.use_security(self.security, self.ssid_name, self.security_key) @@ -413,7 +410,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N passwd=self.ttls_pass) # Create stations - logger.info(f"Creating {num_sta} test stations configurated for test AP SSID {dut_ssid}") + logger.info(f"Creating {self.num_sta} test stations configurated for test AP SSID {self.ssid_name}") logger.debug(f"Creating stations: {station_list}") station_profile.create(radio=radio, sta_names_=station_list) @@ -447,7 +444,7 @@ def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=N # exit() return True else: - logger.error(f"One or more stations did not connect to test AP SSID {dut_ssid}") + logger.error(f"One or more stations did not connect to test AP SSID {self.ssid_name}") return False # create a multicast profile @@ -659,18 +656,16 @@ def run(self, file_n=None): self.start_sniffer(radio_channel=self.channel, radio=self.sniff_radio, test_name="roam_" + str(self.sta_type) + "_" + str(self.option) + "start" + "_", duration=3600) + if self.band == "twog": self.local_realm.reset_port(self.twog_radios) - self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name, - dut_security=self.security, dut_passwd=self.security_key, radio=self.twog_radios) - if self.band == "fiveg": + self.create_stations(radio=self.twog_radios) + elif self.band == "fiveg": self.local_realm.reset_port(self.fiveg_radios) - self.create_n_clients(sta_prefix="wlan", num_sta=self.num_sta, dut_ssid=self.ssid_name, - dut_security=self.security, dut_passwd=self.security_key, radio=self.fiveg_radios) - if self.band == "sixg": + self.create_stations(radio=self.fiveg_radios) + else: self.local_realm.reset_port(self.sixg_radios) - self.create_n_clients(sta_prefix="wlan", num_sta=self.num_sta, dut_ssid=self.ssid_name, - dut_security=self.security, dut_passwd=self.security_key, radio=self.sixg_radios) + self.create_stations(radio=self.sixg_radios) # Check if all stations have ip or not sta_list = self.get_station_list()