Skip to content

Commit

Permalink
testcases/loading: add a new class of tests
Browse files Browse the repository at this point in the history
These tests load the SMB server and check the results.

Signed-off-by: Sachin Prabhu <[email protected]>
  • Loading branch information
spuiuk committed Apr 2, 2024
1 parent c364d47 commit 3960513
Showing 1 changed file with 253 additions and 0 deletions.
253 changes: 253 additions & 0 deletions testcases/loading/test_loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
#
# A simple load test
#
# We use python threads to open up several consecutive connections
# on the SMB server and perform either open/write, open/read and delete
# operations with an interval of 0-0.5 seconds between each operation.
# The tests are run for fixed duration of time before we stop and
# print out the stats for the number of operations performed
#


import testhelper
import random
import time
import threading
import typing
import pytest
import os


class SimpleLoadTest:
"""A helper class to generate a simple load on a SMB server"""

instance_num = 0
max_files = 10
test_string = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

def __init__(
self,
hostname: str,
share: str,
username: str,
passwd: str,
testdir: str,
):
self.idnum = type(self).instance_num
type(self).instance_num += 1
self.testdir = testdir
self.rootpath = f"{testdir}/test{self.idnum}"
self.files: typing.List[str] = []
self.thread = None
self.run_test = False
# Operations in the frequency of which they are called
self.ops = ["write", "read", "read", "read", "delete"]
self.stats = {"read": 0, "write": 0, "delete": 0, "error": 0}
self.smbclient = testhelper.SMBClient(
hostname, share, username, passwd
)

def disconnect(self) -> None:
self.smbclient.disconnect()

def _new_file(self) -> str:
# Don't go above max_files
if len(self.files) >= self.max_files:
return ""
file = "file" + str(random.randint(0, 1000))
if file not in self.files:
self.files.append(file)
return f"{self.rootpath}/{file}"
return self._new_file()

def _get_file(self) -> str:
if not self.files:
return ""
file = random.choice(self.files)
return f"{self.rootpath}/{file}"

def _del_file(self) -> str:
if not self.files:
return ""
file = random.choice(self.files)
self.files.remove(file)
return f"{self.rootpath}/{file}"

def _simple_run(self, op=""):
if op == "":
op = random.choice(self.ops)
try:
if op == "read":
file = self._get_file()
if not file:
# If file doesn't exist, then run an open op first
self._simple_run(op="write")
return
self.stats["read"] += 1
self.smbclient.read_text(file)
elif op == "write":
file = self._new_file()
if not file:
return
self.stats["write"] += 1
self.smbclient.write_text(file, type(self).test_string)
elif op == "delete":
file = self._del_file()
if not file:
return
self.stats["delete"] += 1
self.smbclient.unlink(file)
except IOError as error:
print(error)
self.error += 1
# sleep between 0-0.5 seconds between each op
time.sleep(random.random() * 0.5)

def _clean_up(self):
for file in self.files:
self.smbclient.unlink(f"{self.rootpath}/{file}")
self.files = []

def simple_load(self):
self.smbclient.mkdir(self.rootpath)
while self.run_test is True:
self._simple_run()
self._clean_up()
self.smbclient.rmdir(self.rootpath)

def start(self):
self.run_test = True
self.thread = threading.Thread(target=self.simple_load, args=())
self.thread.start()

def stop(self):
self.run_test = False

def cleanup(self):
if self.run_test:
self.stop()
self.thread.join()
self.disconnect()


class LoadTest:
def __init__(
self,
hostname: str,
share: str,
username: str,
passwd: str,
testdir: str,
):
self.server = hostname
self.share = share
self.username = username
self.password = passwd
self.testdir = testdir
self.connections: typing.List[SimpleLoadTest] = []

def get_connection_num(self) -> int:
return len(self.connections)

def set_connection_num(self, num: int) -> None:
cn = self.get_connection_num()
if cn < num:
for _ in range(0, num - cn):
self.connections.append(
SimpleLoadTest(
self.server,
self.share,
self.username,
self.password,
self.testdir,
)
)
elif cn >= num:
for s in self.connections[num:]:
s.disconnect()
del self.connections[num:]

def total_stats(self) -> typing.Dict[str, int]:
total_stats = {"write": 0, "read": 0, "delete": 0, "error": 0}
for smbcon in self.connections:
stats = smbcon.stats
total_stats["write"] += stats["write"]
total_stats["read"] += stats["read"]
total_stats["delete"] += stats["delete"]
total_stats["error"] += stats["error"]
return total_stats

def start_tests(self):
for s in self.connections:
s.start()

def stop_tests(self):
for smbcon in self.connections:
smbcon.stop()
for smbcon in self.connections:
smbcon.cleanup()


test_info_file = os.getenv("TEST_INFO_FILE")
test_info = testhelper.read_yaml(test_info_file)


def generate_loading_check() -> typing.List[tuple[str, str]]:
arr = []
for sharename in testhelper.get_exported_shares(test_info):
share = testhelper.get_share(test_info, sharename)
arr.append((share["server"], share["name"]))
return arr


@pytest.mark.parametrize("hostname,sharename", generate_loading_check())
def test_loading(hostname: str, sharename: str) -> None:
mount_params = testhelper.get_mount_parameters(test_info, sharename)
testdir = "/loadtest"
# Open a connection to create and finally remove the testdir
smbcon = testhelper.SMBClient(
hostname,
mount_params["share"],
mount_params["username"],
mount_params["password"],
)
smbcon.mkdir(testdir)

# Start load test
loadtest = LoadTest(
hostname,
mount_params["share"],
mount_params["username"],
mount_params["password"],
testdir,
)
# 100 consecutive connections
loadtest.set_connection_num(100)
loadtest.start_tests()
# Total 30 seconds of runtime
total_stats_old = {"write": 0, "read": 0, "delete": 0, "error": 0}
for i in range(6):
time.sleep(5)
total_stats = loadtest.total_stats()
print(
f"{i*5}s to {(i+1)*5}s stats - "
f'read: {total_stats["read"] - total_stats_old["read"]} '
f'write: {total_stats["write"] - total_stats_old["write"]} '
f'delete: {total_stats["delete"] - total_stats_old["delete"]} '
f'errors: {total_stats["error"] - total_stats_old["error"]}'
)
total_stats_old = total_stats
loadtest.stop_tests()
# End load test

total_stats = loadtest.total_stats()
print(
f"Totals: "
f'read: {total_stats["read"]} '
f'write: {total_stats["write"]} '
f'delete: {total_stats["delete"]} '
f'errors: {total_stats["error"]}'
)

smbcon.rmdir(testdir)
smbcon.disconnect()

0 comments on commit 3960513

Please sign in to comment.