Skip to content

Commit

Permalink
Add latency logs
Browse files Browse the repository at this point in the history
  • Loading branch information
iVishalr committed Jun 25, 2023
1 parent 8e0eab8 commit 6fe257e
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 234 deletions.
10 changes: 5 additions & 5 deletions config/evaluator.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"backend": {
"name": "backend",
"log_dir": "/home/vishalr/Desktop/backend-2022/logs/",
"config_dir": "/home/vishalr/Desktop/backend-2022/configs/"
"log_dir": "/home/vishalramesh01/backend-2022/logs/",
"config_dir": "/home/vishalramesh01/backend-2022/configs/"
},

"executor": {
"name": "Assignment2",
"num_backends": 2,
"name": "eval-test",
"num_backends": 1,
"num_workers": 1,
"fetch_port": 9000,
"fetch_route": "get-jobs",
Expand All @@ -31,7 +31,7 @@
"docker_port": 10000,
"docker_route": "run_job",
"docker_image": "hadoop-3.2.2:0.1",
"cpu_limit": 6,
"cpu_limit": 2,
"taskset": true,
"memory_limit": "8000m",
"shared_output_dir": "/home/vishalr/Desktop/backend-2022/output/",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[21/04/2023 16:07:38] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682076194 | Wait : 10.2921s Processing : 33.6957s Total: 45.0553s
[21/04/2023 16:08:12] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682075506 | Wait : 41.9155s Processing : 34.4546s Total: 77.3275s
[21/04/2023 16:08:46] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682075079 | Wait : 73.9085s Processing : 34.3627s Total: 109.1788s
[21/04/2023 16:09:20] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682074665 | Wait : 106.4873s Processing : 33.7756s Total: 141.2229s
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[21/04/2023 16:03:13] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682075960 | Wait : 11.2492s Processing : 31.2768s Total: 45.2602s
[21/04/2023 16:03:41] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682081177 | Wait : 37.2587s Processing : 31.4242s Total: 70.8305s
[21/04/2023 16:04:09] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682074946 | Wait : 63.9851s Processing : 29.8131s Total: 96.7524s
[21/04/2023 16:04:37] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682081884 | Wait : 90.8077s Processing : 29.1787s Total: 121.9034s
[21/04/2023 16:05:06] [output_processor] Team : BD1_ADMIN_09 Assignment ID : A2T1_1682079652 | Wait : 111.1782s Processing : 30.1985s Total: 147.2870s
6 changes: 6 additions & 0 deletions flask_backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ def executor_log():
f.write(logs[logname])
f.close()

logs = json.loads(data["lats_logs"])
for logname in logs:
f = open(os.path.join(executor_log_path, logname), "a+")
f.write(logs[logname])
f.close()

logs = json.loads(data["syslogs"])
for logname in logs:
f = open(os.path.join(executor_log_path, logname), "a+")
Expand Down
17 changes: 16 additions & 1 deletion job_tracker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ def get_op_logs(self, path, offset):
logs[filename] = data
return logs, new_offset

def get_lats_logs(self, path, offset):
logs = {}
new_offset = 0
for filename in os.listdir(path):
if ".txt" not in filename or 'lats' not in filename:
continue
data, new_offset = self.read_logs(os.path.join(path, filename), offset)
logs[filename] = data
return logs, new_offset

def get_sys_logs(self, path, offset):
logs = {}
new_offset = 0
Expand Down Expand Up @@ -360,17 +370,19 @@ def logs_fn(self):
url = f"http://{self.fetch_ip}:{self.fetch_port}/executor-log"
executor_log_path = os.path.join(self.log_dir, self.executor_name, self.executor_uuid)

wlog_offset, oplog_offset, sys_log_offset = 0, 0, 0
wlog_offset, oplog_offset, latslog_offset, sys_log_offset = 0, 0, 0, 0
while not self.global_queue_thread.stopped():
wlogs, wlog_offset = self.get_worker_logs(executor_log_path, wlog_offset)
oplogs, oplog_offset = self.get_op_logs(executor_log_path, oplog_offset)
latslogs, latslog_offset = self.get_lats_logs(executor_log_path, latslog_offset)
syslogs, sys_log_offset = self.get_sys_logs(executor_log_path, sys_log_offset)

payload = {
'executor_name': self.executor_name,
'executor_uuid': self.executor_uuid,
'worker_logs': json.dumps(wlogs),
'output_processor_logs': json.dumps(oplogs),
'lats_logs': json.dumps(latslogs),
'syslogs': json.dumps(syslogs),
}

Expand All @@ -388,12 +400,15 @@ def logs_fn(self):
# send final logs before stopping
wlogs, wlog_offset = self.get_worker_logs(executor_log_path, wlog_offset)
oplogs, oplog_offset = self.get_op_logs(executor_log_path, oplog_offset)
latslogs, latslog_offset = self.get_lats_logs(executor_log_path, latslog_offset)
syslogs, sys_log_offset = self.get_sys_logs(executor_log_path, sys_log_offset)

payload = {
'executor_name': self.executor_name,
'executor_uuid': self.executor_uuid,
'worker_logs': json.dumps(wlogs),
'output_processor_logs': json.dumps(oplogs),
'lats_logs': json.dumps(latslogs),
'syslogs': json.dumps(syslogs),
}

Expand Down
5 changes: 4 additions & 1 deletion output_processor/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def output_processor_fn(rank: int, output_log_path: str, event: threading.Event,
# sys.stdout = Tee(sys.stdout, f)

sys.stdout = Logger(os.path.join(output_log_path, "output_processor_logs.txt"), 'a+')

lats_f = open(os.path.join(output_log_path, "lats.txt"), "a+")
from common.db import DataBase
from output_processor import queue, broker

Expand Down Expand Up @@ -191,6 +191,9 @@ def thread_fn(rank, event: threading.Event):
with open(os.path.join(lats_path, job.assignment_id, f"{job.assignment_id}_{job.submission_id}.json"), "w+") as lats_fp:
json.dump(lats, lats_fp, ensure_ascii=False, indent=4)

lats_f.write(f"[{get_datetime()}] [output_processor]\tTeam : {teamId} Assignment ID : {assignmentId}_{submissionId} | Wait : {lats_summary['waiting_time']:.4f}s Processing : {lats_summary['processing_time']:.4f}s Total: {lats_summary['total_time']:.4f}s\n")
lats_f.flush()

#end of thread_fn
threads : List[threading.Thread] = []
thread_events : List[threading.Event] = []
Expand Down
6 changes: 3 additions & 3 deletions scripts/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ windows:
- flask_backend:
- sleep 10
- gunicorn -w 4 --preload --timeout 90 --bind 0.0.0.0:9000 "flask_backend.backend:createApp()"
- mailer:
- sleep 12
- python3 -m common.mailer
# - mailer:
# - sleep 12
# - python3 -m common.mailer
3 changes: 0 additions & 3 deletions scripts/evaluator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ windows:
- mailer:
- sleep 10
- python3 -m common.mailer
- output:
- sleep 10
- python3 -m output_processor.output
- executor:
- sleep 15
- python3 -m job_tracker.executor
Loading

0 comments on commit 6fe257e

Please sign in to comment.