-
Notifications
You must be signed in to change notification settings - Fork 2
/
dashboard.py
135 lines (114 loc) · 5.6 KB
/
dashboard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import json
from rich.console import Console
from rich.table import Table
from datetime import datetime
status_file = 'status.json'
log_file = 'upload_log.txt'
random_upload_time_file = 'random-upload-times.json'
random_waits_file = 'random-waits.json'
downloads_dir = 'downloads'
console = Console()
def read_json_file(file_path):
"""Read a JSON file and return its contents, or return an empty dictionary if the file doesn't exist or is corrupted."""
if not os.path.exists(file_path):
console.print(f"[bold red]{file_path} not found.[/bold red]")
return {}
try:
with open(file_path, 'r') as file:
return json.load(file)
except json.JSONDecodeError as e:
console.print(f"[bold red]Error reading {file_path}: {e}[/bold red]")
return {}
def read_text_file(file_path):
"""Read a text file line by line, returning a list of lines or an empty list if the file doesn't exist or is corrupted."""
if not os.path.exists(file_path):
console.print(f"[bold red]{file_path} not found.[/bold red]")
return []
try:
with open(file_path, 'r') as file:
return [line.strip() for line in file]
except Exception as e:
console.print(f"[bold red]Error reading {file_path}: {e}[/bold red]")
return []
def get_file_counts():
"""Return the count of total, uploaded, and unuploaded .mp4 files in the downloads directory, along with the folder size."""
if not os.path.exists(downloads_dir):
console.print("[bold red]Downloads directory not found.[/bold red]")
return 0, [], 0, 0
total_files = [f for f in os.listdir(downloads_dir) if f.endswith('.mp4')]
uploaded_files = read_text_file(log_file)
uploaded_files_set = set(uploaded_files)
unuploaded_files = [f for f in total_files if f not in uploaded_files_set]
folder_size = sum(os.path.getsize(os.path.join(downloads_dir, f)) for f in total_files) / (1024 * 1024)
return len(total_files), uploaded_files, len(unuploaded_files), folder_size
def format_time(timestamp):
"""Convert a timestamp to a readable format."""
if not timestamp or timestamp == 'None':
return "N/A"
try:
return datetime.fromtimestamp(float(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
return "Invalid timestamp"
def display_dashboard():
"""Display the status dashboard with file counts, upload stats, and random wait/upload times."""
status = read_json_file(status_file)
if not status:
return
uploads = read_text_file(log_file)
random_upload_times = read_json_file(random_upload_time_file)
random_waits = read_json_file(random_waits_file)
total_files, uploaded_files, unuploaded_files, folder_size = get_file_counts()
console.print("=" * 80, justify="left")
console.print("[bold yellow]Instagram Thefty Poster Dashboard[/bold yellow]", justify="left")
console.print("=" * 80, justify="left")
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Scrape Status", justify="center")
table.add_column("Upload Status", justify="center")
table.add_row(
f"Last Scrape Time: {format_time(status.get('last_scrape_time'))}\nNext Scrape Time: {format_time(status.get('next_scrape_time'))}",
f"Last Upload Time: {format_time(status.get('last_upload_time'))}\nNext Upload Time: {format_time(status.get('next_upload_time'))}"
)
console.print(table)
file_table = Table(show_header=True, header_style="bold magenta")
file_table.add_column("Metric", justify="center")
file_table.add_column("Value", justify="center")
file_table.add_row("Total .mp4 Files", str(total_files))
file_table.add_row("Uploaded .mp4 Files", str(len(uploaded_files)))
file_table.add_row("Unuploaded .mp4 Files", str(unuploaded_files))
file_table.add_row("Downloads Folder Size (MB)", f"{folder_size:.2f}")
console.print(file_table)
console.print("[bold]Last 10 Uploads[/bold]")
for upload in uploads[-10:]:
console.print(f"- {upload}")
console.print("[bold]Reels Scraped[/bold]")
for reel in status.get('reels_scraped', []):
console.print(f"- {reel}")
console.print("[bold]Random Upload Times[/bold]")
for item in random_upload_times[-10:]:
if isinstance(item, dict):
time_record = item.get('time', 'N/A')
profile_reel_id = item.get('profile_reel_id', 'N/A')
console.print(f"- {time_record} seconds for {profile_reel_id}")
else:
console.print(f"- {item}")
console.print("[bold]Random Wait Times[/bold]")
for item in random_waits[-10:]:
if isinstance(item, dict):
time_record = item.get('time', 'N/A')
profile_reel_id = item.get('profile_reel_id', 'N/A')
console.print(f"- {time_record} seconds for {profile_reel_id}")
else:
console.print(f"- {item}")
table2 = Table(show_header=True, header_style="bold magenta")
table2.add_column("Story Upload Status", justify="center")
table2.add_column("Deletion Status", justify="center")
table2.add_row(
f"Last Story Upload Time: {format_time(status.get('last_story_upload_time'))}\nNext Story Upload Time: {format_time(status.get('next_story_upload_time'))}",
f"Last Deletion Time: {format_time(status.get('last_delete_time'))}\nNext Deletion Time: {format_time(status.get('next_deletion_time'))}"
)
console.print(table2)
next_file = status.get('next_file_to_upload', 'N/A')
console.print(f"[bold]Next File to Upload:[/bold] {next_file}")
if __name__ == "__main__":
display_dashboard()