Skip to content

Commit

Permalink
New Features
Browse files Browse the repository at this point in the history
  • Loading branch information
iVishalr committed Apr 18, 2023
1 parent 331de46 commit ca4ee75
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 213 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ compile-test/
output/
configs/
logs/
lats/
# answer/A1T1
# answer/A1T2
*logs.txt
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# BigHOST

This is the official repository of BigHOST. BigHOST is an autograding system that is developed to evaluate Big Data Assignments. BigHOST employs a simple architecture that is scalable and provides a fair environment for executing multiple Big Data jobs in parallel.

## Citation

This work has been accepted at _2023 IEEE/ACM 23rd International Symposium on Cluster, Cloud and Internet Computing Workshops (CCGridW)_.
Expand Down
6 changes: 5 additions & 1 deletion common/email_service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
import time
import smtplib
from common import dbuser_rr, dbuser_ec

from common.db import DataBase
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

db = DataBase()
dbuser_rr = db.metadata["RR"]["collections"]["users"]
dbuser_ec = db.metadata["EC"]["collections"]["users"]

class EmailingService:
@staticmethod
def get_connection(user, password) -> smtplib.SMTP_SSL:
Expand Down
8 changes: 6 additions & 2 deletions config/evaluator.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
"timeout": 30,
"log_dir": "/home/vishalr/Desktop/backend-2022/executor-logs/",
"log_timeout": 120,
"sys_timeout": 1
"sys_timeout": 1,
"output_processor_threads": 1,
"output_processor_timeout": 30,
"store_lats": true,
"lats_dir": "/home/vishalr/Desktop/backend-2022/lats/"
},

"docker": {
Expand All @@ -30,7 +34,7 @@
"cpu_limit": 6,
"taskset": true,
"memory_limit": "8000m",
"shared_output_dir": "/home/vishalramesh01/backend-2022/output/",
"shared_output_dir": "/home/vishalr/Desktop/backend-2022/output/",
"docker_output_dir": "/output",
"docker_memswapiness": 0,
"spawn_wait": 40,
Expand Down
51 changes: 44 additions & 7 deletions flask_backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from flask import Flask, request, jsonify
from signal import signal, SIGPIPE, SIG_DFL
from flask_backend.parser import SanityCheckerASTVisitor
from job_tracker.job import MRJob, SparkJob, KafkaJob, job_template_selector
from pprint import pprint

signal(SIGPIPE, SIG_DFL)

Expand Down Expand Up @@ -113,12 +115,23 @@ def update_submission(marks, message, data, send_mail=False):
@cross_origin()
def sanity_check():
'''
Currently assuming the assignment to be a MR Job
BigHOST Sanity Checker
'''

jobs = json.loads(request.data)
data = jobs

job_template = job_template_selector(data["assignmentId"])

job = job_template(
team_id=data["teamId"],
assignment_id=data["assignmentId"],
submission_id=data["submissionId"]
)

job.record("received")
update_submission(marks=-1, message='Sanity Checking', data=data)
job.record("sanity_check_start")

compile_path = f"{os.path.join(os.getcwd(),'compile-test', str(data['submissionId']))}"

Expand All @@ -127,7 +140,7 @@ def sanity_check():

_ = open(os.path.join(compile_path, "__init__.py"), "w+").close() # required for pylint to work

if "A3" not in data["assignmentId"]:
if "A1" in data["assignmentId"] or "A2" in data["assignmentId"]:
mapper_data = data["mapper"]
reducer_data = data['reducer']
mapper_name = f"{data['teamId']}-{data['assignmentId']}-mapper.py"
Expand Down Expand Up @@ -250,10 +263,25 @@ def sanity_check():
update_submission(marks=-1, message='Sanity Check Passed', data=data)

data["timeout"] = get_timeouts(assignment_id=data['assignmentId'])

job.record("sanity_check_end")

if isinstance(job, MRJob):
job.mapper = data["mapper"]
job.reducer = data["reducer"]
elif isinstance(job, SparkJob):
job.spark = data["spark"]
elif isinstance(job, KafkaJob):
job.producer = data["producer"]
job.consumer = data["consumer"]

job.timeout = data["timeout"]

update_submission(marks=-1, message='Queued for Execution', data=data)
update_submission(marks=-1, message='Queued for Execution', data=job.get_db_fields())

data = pickle.dumps(data)
job.record("waiting_queue_entry")

data = pickle.dumps(job)
queue.enqueue(data)

res = {"msg": "Queued", "len": len(queue)}
Expand All @@ -280,7 +308,7 @@ def get_jobs():
res = {"msg": "Submission Queue is currently empty.", "len": len(queue), "num_submissions": 0, "status": 200}
return jsonify(res)

data = []
data = {}
i = 0
while i < prefetch_factor:
queue_data = queue.dequeue()
Expand All @@ -290,7 +318,9 @@ def get_jobs():

_, serialized_job = queue_data
job = pickle.loads(serialized_job)
data.append(job)
job.record("waiting_queue_exit")
# pprint(job.__dict__)
data[f"job{i+1}"] = job.__dict__
i += 1

length = len(data)
Expand All @@ -303,6 +333,7 @@ def get_jobs():
"status": 200,
"jobs": data
}

return jsonify(res)

@app.route("/register_executor", methods=["POST"])
Expand Down Expand Up @@ -405,7 +436,13 @@ def executor_log():
if not os.path.exists(executor_log_path):
os.makedirs(executor_log_path)

logs = json.loads(data["logs"])
logs = json.loads(data["worker_logs"])
for logname in logs:
f = open(os.path.join(executor_log_path, logname), "a+")
f.write(logs[logname])
f.close()

logs = json.loads(data["output_processor_logs"])
for logname in logs:
f = open(os.path.join(executor_log_path, logname), "a+")
f.write(logs[logname])
Expand Down
Loading

0 comments on commit ca4ee75

Please sign in to comment.