Skip to content

Commit

Permalink
[#181] Don't use load_conf() to set extra attributes on the configura…
Browse files Browse the repository at this point in the history
…tion dictionary
  • Loading branch information
riley-harper committed Dec 13, 2024
1 parent 4c6e602 commit 46f79e3
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions hlink/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from typing import Any
import uuid

from pyspark.sql import SparkSession

from hlink.spark.session import SparkConnection
from hlink.configs.load_config import load_conf_file
from hlink.errors import SparkError, UsageError
Expand All @@ -26,6 +28,7 @@
from hlink.scripts.lib.conf_validations import analyze_conf
from hlink.scripts.lib.table_ops import drop_all_tables

HLINK_DIR = Path("./hlink_config")
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -72,7 +75,6 @@ def load_conf(conf_name: str, user: str) -> tuple[Path, dict[str, Any]]:
conf["python"] = global_conf["python"]

conf["run_name"] = run_name
print(f"*** Using config file {path}")
return path, conf


Expand All @@ -86,7 +88,8 @@ def cli():

try:
if args.conf:
conf_path, run_conf = load_conf(args.conf, args.user)
conf_path, run_conf = load_conf_file(args.conf)
print(f"*** Using config file {conf_path}")
else:
raise Exception(
"ERROR: You must specify a config file to use by including either the --run or --conf flag in your program call."
Expand All @@ -104,20 +107,19 @@ def cli():
traceback.print_exception("", err, None)
sys.exit(1)

_setup_logging(conf_path, run_conf)
run_name = conf_path.stem
_setup_logging(conf_path, run_name)

logger.info("Initializing Spark")
spark_init_start = timer()
spark = _get_spark(run_conf, args)
spark = _get_spark(run_name, args)
spark_init_end = timer()
spark_init_time = round(spark_init_end - spark_init_start, 2)
logger.info(f"Initialized Spark in {spark_init_time}s")

history_file = os.path.expanduser("~/.history_hlink")
_read_history_file(history_file)

run_name = run_conf["run_name"]

try:
if args.execute_tasks:
main = Main(
Expand Down Expand Up @@ -194,13 +196,18 @@ def _parse_args():
return parser.parse_args()


def _get_spark(run_conf, args):
def _get_spark(run_name: str, args: argparse.Namespace) -> SparkSession:
derby_dir = HLINK_DIR / "derby" / run_name
warehouse_dir = HLINK_DIR / "warehouse" / run_name
tmp_dir = HLINK_DIR / "tmp" / run_name
python = sys.executable

spark_connection = SparkConnection(
run_conf["derby_dir"],
run_conf["warehouse_dir"],
run_conf["spark_tmp_dir"],
run_conf["python"],
"linking",
derby_dir=derby_dir,
warehouse_dir=warehouse_dir,
tmp_dir=tmp_dir,
python=python,
db_name="linking",
)
spark = spark_connection.local(
cores=args.cores, executor_memory=args.executor_memory
Expand Down Expand Up @@ -236,27 +243,26 @@ def _cli_loop(spark, args, run_conf, run_name):
main.cmdloop()
if main.lastcmd == "reload":
logger.info("Reloading config file")
conf_path, run_conf = load_conf(args.conf, args.user)
conf_path, run_conf = load_conf_file(args.conf)
print(f"*** Using config file {conf_path}")
else:
break
except Exception as err:
report_and_log_error("", err)


def _setup_logging(conf_path, conf):
log_dir = Path(conf["log_dir"])
def _setup_logging(conf_path, run_name):
log_dir = HLINK_DIR / "logs"
log_dir.mkdir(exist_ok=True, parents=True)

user = getpass.getuser()
session_id = uuid.uuid4().hex
conf_name = conf["run_name"]
hlink_version = importlib.metadata.version("hlink")

log_file = log_dir / f"{conf_name}-{session_id}.log"
log_file = log_dir / f"{run_name}-{session_id}.log"

# format_string = f"%(levelname)s %(asctime)s {user} {session_id} %(message)s -- {conf['conf_path']}"
format_string = "%(levelname)s %(asctime)s -- %(message)s"
print(f"*** Hlink log: {log_file}")
print(f"*** Hlink log: {log_file.absolute()}")

logging.basicConfig(filename=log_file, level=logging.INFO, format=format_string)

Expand Down

0 comments on commit 46f79e3

Please sign in to comment.