Skip to content

Improve error handling and made it XDG compilant #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,5 @@ tests/

*.mp3
cache.sqlite
tags
.vscode/settings.json
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
],
"files.autoSave": "off",
"editor.wordWrap": "wordWrapColumn",
"workbench.colorTheme": "GitHub Dark",
"workbench.colorTheme": "GitHub Light",
"editor.minimap.autohide": true,
"editor.minimap.renderCharacters": false,
"editor.experimentalWhitespaceRendering": "font",
Expand Down
27 changes: 19 additions & 8 deletions radioactive/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,25 @@ def main():
def signal_handler(sig, frame):
global ffplay
global player
log.debug("You pressed Ctrl+C!")
log.debug("Stopping the radio")
if ffplay and ffplay.is_playing:
ffplay.stop()
# kill the player
player.stop()

log.info("Exiting now")
log.debug("SIGINT received. Initiating shutdown.")

# Stop ffplay if it exists and is currently playing
try:
if ffplay and getattr(ffplay, "is_playing", False):
log.debug("Stopping ffplay...")
ffplay.stop()
except Exception as e:
log.error(f"Error while stopping ffplay: {e}")

# Stop the player if it exists
try:
if player:
log.debug("Stopping player...")
player.stop()
except Exception as e:
log.error(f"Error while stopping player: {e}")

log.info("Shutdown complete. Exiting now.")
sys.exit(0)


Expand Down
2 changes: 1 addition & 1 deletion radioactive/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class App:
def __init__(self):
self.__VERSION__ = "2.9.1" # change this on every update #
self.__VERSION__ = "2.9.2" # change this on every update #
self.pypi_api = "https://pypi.org/pypi/radio-active/json"
self.remote_version = ""

Expand Down
8 changes: 3 additions & 5 deletions radioactive/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys

from zenlog import log
from radioactive.default_path import default_appconfig_file_path, handle_default_path


def write_a_sample_config_file():
Expand All @@ -24,11 +25,8 @@ def write_a_sample_config_file():
"player": "ffplay",
}

# Get the user's home directory
home_directory = os.path.expanduser("~")

# Specify the file path
file_path = os.path.join(home_directory, ".radio-active-configs.ini")
handle_default_path(default_appconfig_file_path)
file_path = os.path.join(default_appconfig_file_path, ".radio-active-configs.ini")

try:
# Write the configuration to the file
Expand Down
45 changes: 45 additions & 0 deletions radioactive/default_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
import sys
from zenlog import log

def get_xdg_paths():
"""
Retrieve the XDG base directories with proper defaults.
XDG_CONFIG_HOME defaults to ~/.config.
XDG_DATA_HOME defaults to ~/.local/share.
XDG_CACHE_HOME defaults to ~/.cache.
"""
home = os.path.expanduser("~")
config_home = os.getenv("XDG_CONFIG_HOME", os.path.join(home, ".config"))
data_home = os.getenv("XDG_DATA_HOME", os.path.join(home, ".local", "share"))
cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(home, ".cache"))
return config_home, data_home, cache_home

xdg_config_home, xdg_data_home, xdg_cache_home = get_xdg_paths()

# Data files (record and station files) remain under XDG_DATA_HOME.
default_record_file_path = os.path.join(xdg_data_home, "radioactive")
default_station_file_path = os.path.join(xdg_data_home, "radioactive")

# Configuration files now use XDG_CONFIG_HOME.
default_appconfig_file_path = os.path.join(xdg_config_home, "radioactive")

def ensure_directory(path):
"""
Ensure the directory exists, otherwise attempt to create it.
Exits if the directory cannot be created.
"""
try:
os.makedirs(path, exist_ok=True)
log.debug(f"Directory ensured: {path}")
except Exception as e:
log.error(f"Could not create directory {path}: {e}")
sys.exit(1)

def handle_default_path(default_path):
"""
Handle default path by ensuring the directory exists.
"""
log.debug(f"Ensuring default directory: {default_path}")
ensure_directory(default_path)

24 changes: 15 additions & 9 deletions radioactive/ffplay.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ def kill_background_ffplays():
p.terminate()
count += 1
log.info(f"Terminated ffplay process with PID {pid}")
# Ensure process has time to terminate
sleep(0.5)
if p.is_running():
p.kill()
log.debug(f"Forcefully killing ffplay process with PID {pid}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# Handle exceptions, such as processes that no longer exist or access denied
log.debug("Could not terminate a ffplay processes!")
log.debug("Could not terminate a ffplay process!")
if count == 0:
log.info("No background radios are running!")

Expand All @@ -40,6 +41,7 @@ def __init__(self, URL, volume, loglevel):
self.loglevel = loglevel
self.is_playing = False
self.process = None
self.is_running = False

self._check_ffplay_installation()
self.start_process()
Expand Down Expand Up @@ -70,37 +72,40 @@ def start_process(self):
stderr=subprocess.PIPE,
text=True,
)

self.is_running = True
self.is_playing = True
self._start_error_thread()

except Exception as e:
log.error("Error while starting radio: {}".format(e))
self.is_running = False
self.is_playing = False

def _start_error_thread(self):
error_thread = threading.Thread(target=self._check_error_output)
error_thread.daemon = True
error_thread.start()

def _check_error_output(self):
while self.is_running:
while self.is_running and self.process and self.process.stderr:
stderr_result = self.process.stderr.readline()
if stderr_result:
self._handle_error(stderr_result)
self.is_running = False
self.stop()
break
sleep(2)

def _handle_error(self, stderr_result):
print()
log.error("Could not connect to the station")
try:
log.debug(stderr_result)
log.error(stderr_result.split(": ")[1])
parts = stderr_result.split(": ")
if len(parts) > 1:
log.error(parts[1].strip())
else:
log.error(stderr_result.strip())
except Exception as e:
log.debug("Error: {}".format(e))
pass

def terminate_parent_process(self):
parent_pid = os.getppid()
Expand Down Expand Up @@ -132,7 +137,7 @@ def play(self):
self.start_process()

def stop(self):
if self.is_playing:
if self.is_playing and self.process:
try:
self.process.kill()
self.process.wait(timeout=5)
Expand All @@ -145,6 +150,7 @@ def stop(self):
raise
finally:
self.is_playing = False
self.is_running = False
self.process = None
else:
log.debug("Radio is not currently playing")
Expand Down
37 changes: 16 additions & 21 deletions radioactive/last_station.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,34 @@
""" This module saves the current playing station information to a hidden file,
and loads the data when no arguments are provide """
and loads the data when no arguments are provided. """

import json
import os.path

import os
from zenlog import log

from radioactive.default_path import default_station_file_path, handle_default_path

class Last_station:

"""Saves the last played radio station information,
when user don't provide any -S or -U it looks for the information.

on every successful run, it saves the station information.
The file it uses to store the data is a hidden file under users' home directory
"""Saves the last played radio station information.

When the user doesn't provide any -S or -U, it looks for the information.
On every successful run, it saves the station information.
The file it uses to store the data is a hidden file under the user's home directory.
"""

def __init__(self):
self.last_station_path = None

self.last_station_path = os.path.join(
os.path.expanduser("~"), ".radio-active-last-station"
)
handle_default_path(default_station_file_path)
self.last_station_path = os.path.join(default_station_file_path, ".radioactive-last-station")

def get_info(self):
"""Loads the last station information from the hidden file."""
try:
with open(self.last_station_path, "r") as f:
last_station = json.load(f)
return last_station
except Exception:
return ""
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
log.warning("No last station information found or invalid JSON.")
return None

def save_info(self, station):
"""dumps the current station information as a json file"""

"""Saves the current station information as a JSON file."""
log.debug("Dumping station information")
with open(self.last_station_path, "w") as f:
json.dump(station, f)
57 changes: 47 additions & 10 deletions radioactive/mpv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import subprocess
import sys
from shutil import which

from zenlog import log


Expand All @@ -12,43 +11,81 @@ def __init__(self):
log.debug(f"{self.program_name}: {self.exe_path}")

if self.exe_path is None:
log.critical(f"{self.program_name} not found, install it first please")
log.critical(f"{self.program_name} not found. Please install it first.")
sys.exit(1)

self.is_running = False
self.process = None
self.url = None

def _construct_mpv_commands(self, url):
"""Constructs the command to run mpv with the given URL."""
return [self.exe_path, url]

def start(self, url):
"""Starts the mpv player with the specified URL."""
if not self._is_valid_url(url):
log.error(f"Invalid URL provided: {url}")
return

# If player is already running, do not start another process.
if self.is_running and self.process:
log.debug(f"{self.program_name} is already running with PID {self.process.pid}.")
return

self.url = url
mpv_commands = self._construct_mpv_commands(url)

try:
self.process = subprocess.Popen(
mpv_commands,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
self.is_running = True
log.debug(
f"player: {self.program_name} => PID {self.process.pid} initiated"
)
log.debug(f"Player: {self.program_name} => PID {self.process.pid} initiated")

except Exception as e:
except FileNotFoundError:
log.critical(f"{self.program_name} executable not found at {self.exe_path}.")
sys.exit(1)
except subprocess.SubprocessError as e:
log.error(f"Error while starting player: {e}")
except Exception as e:
log.error(f"Unexpected error while starting player: {e}")

def stop(self):
if self.is_running:
self.process.kill()
self.is_running = False
"""Stops the mpv player if it is running."""
if self.is_running and self.process:
try:
self.process.terminate() # Use terminate for a graceful shutdown
self.process.wait(timeout=10)
log.debug(f"Player: {self.program_name} stopped.")
except subprocess.TimeoutExpired:
log.warning(f"{self.program_name} did not terminate gracefully, killing process.")
self.process.kill()
self.process.wait()
except Exception as e:
log.error(f"Error stopping {self.program_name}: {e}")
finally:
self.is_running = False
self.process = None
else:
log.debug(f"Player: {self.program_name} is not running.")

def toggle(self):
"""Toggles the mpv player state between running and stopped."""
if self.is_running:
self.stop()
else:
if self.url is None:
log.error("No URL set; cannot start the player.")
return
self.start(self.url)

def _is_valid_url(self, url):
"""Validates the provided URL."""
if not isinstance(url, str) or not url:
return False
# Basic validation: check if the URL starts with http or https
return url.startswith("http://") or url.startswith("https://")
Loading