Skip to content

Commit

Permalink
Add server charts
Browse files Browse the repository at this point in the history
  • Loading branch information
joinemm committed Nov 21, 2023
1 parent ca5a9b3 commit 9fbfbdd
Showing 1 changed file with 252 additions and 31 deletions.
283 changes: 252 additions & 31 deletions cogs/fm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import math
import os
import re
import sys
import urllib.parse
from dataclasses import dataclass
from enum import Enum, auto
Expand Down Expand Up @@ -208,6 +209,11 @@ def __init__(self, bot):
self.bot: MisoBot = bot
self.api = LastFmApi(bot)

async def cog_load(self):
await self.api.login(
self.bot.keychain.LASTFM_USERNAME, self.bot.keychain.LASTFM_PASSWORD
)

@commands.group(aliases=["lastfm", "lfm", "lf"])
async def fm(self, ctx: MisoContext):
"""Interact with LastFM using your linked account"""
Expand Down Expand Up @@ -584,7 +590,7 @@ async def recent(self, ctx: MisoContext):
try:
timestamp = f'<t:{track["date"]["uts"]}:R>'
except KeyError:
timestamp = discord.utils.format_dt(arrow.now().datetime, "R")
timestamp = ":notes:"
rows.append(f'**{track["artist"]["#text"]}{track["name"]}** {timestamp}')

await self.paginated_user_stat_embed(
Expand Down Expand Up @@ -674,7 +680,7 @@ async def artist_overview(self, ctx: MisoContext, timeframe: Period, artist: str
f"https://last.fm/user/{ctx.lfm.username}/library/music/"
f"{artist_url_format}?date_preset={timeframe.web_format()}"
)
soup = await self.api.scrape_page(url, authenticated=True)
soup = await self.api.scrape_page(url)

try:
chartlists = soup.select(".chartlist")
Expand Down Expand Up @@ -805,7 +811,7 @@ async def artist_top(
f"https://last.fm/user/{ctx.lfm.username}/library/music/"
f"{artist_url_format}/+{data_type}?date_preset={timeframe.web_format()}"
)
soup = await self.api.scrape_page(url, authenticated=True)
soup = await self.api.scrape_page(url)

formatted_name = artistinfo["name"]
row_items = self.api.get_library_playcounts(soup)
Expand Down Expand Up @@ -1221,40 +1227,46 @@ async def server_nowplaying(self, ctx: MisoContext):
async def server_recent(self, ctx: MisoContext):
"""What this server has recently listened to"""
data = await self.task_for_each_server_member(
ctx.guild, self.api.user_get_recent_tracks
ctx.guild, self.api.user_get_recent_tracks, limit=10
)

if data is None:
return await ctx.send(
"Nobody on this server has connected their Last.fm account yet!"
)

rows = []
track_list = []
for member_data, member in data:
suffix = ""
if member_data["nowplaying"]:
suffix = " :musical_note:"
tracks = member_data["track"]
for track in tracks:
artist_name = track["artist"]["#text"]
track_name = track["name"]
try:
when = int(track["date"]["uts"])
except KeyError:
when = sys.maxsize

artist_name = member_data["artist"]["#text"]
track_name = member_data["name"]
track_list.append([when, artist_name, track_name, member])

if not track_list:
return await ctx.send("Nobody on this server has listened to anything!")

track_list = sorted(track_list, key=lambda x: x[0], reverse=True)

rows = []
for when, artist_name, track_name, member in track_list:
if when == sys.maxsize:
suffix = ":notes:"
else:
suffix = f"<t:{when}:R>"
rows.append(
f"{util.displayname(member)} | **{escape_markdown(artist_name)}** "
f"— ***{escape_markdown(track_name)}***{suffix}"
f"— ***{escape_markdown(track_name)}*** {suffix}"
)

if not rows:
return await ctx.send("Nobody on this server has listened to anything!")

content = (
discord.Embed(color=int(self.LASTFM_RED, 16))
.set_author(
name=f"What has {ctx.guild.name} been listening to?",
icon_url=ctx.guild.icon,
)
.set_footer(
text=f"{len(rows)} / {len(data)} Members are listening to music right now"
)
content = discord.Embed(color=int(self.LASTFM_RED, 16)).set_author(
name=f"What has {ctx.guild.name} been listening to?",
icon_url=ctx.guild.icon,
)

await RowPaginator(content, rows).run(ctx)
Expand Down Expand Up @@ -1353,7 +1365,11 @@ async def server_toptracks(
track_map[name]["score"] += score
track_map[name]["playcount"] += playcount
except KeyError:
track_map[name] = {"score": score, "playcount": playcount}
track_map[name] = {
"score": score,
"playcount": playcount,
"url": track["url"],
}

top_tracks = sorted(
track_map.items(), key=lambda x: x[1]["score"], reverse=True
Expand All @@ -1370,9 +1386,7 @@ async def server_toptracks(
ctx,
rows,
f"Top 100 Tracks ({timeframe.display()})",
image=await self.api.get_artist_image(
top_tracks[0][0].split(" — ")[0].strip("*")
),
image=await self.api.scrape_track_image(top_tracks[0][1]["url"]),
footer=f"Score calculated from top 100 tracks of {len(data)} members",
server_target=True,
)
Expand Down Expand Up @@ -1414,7 +1428,11 @@ async def server_topalbums(
album_map[name]["score"] += score
album_map[name]["playcount"] += playcount
except KeyError:
album_map[name] = {"score": score, "playcount": playcount}
album_map[name] = {
"score": score,
"playcount": playcount,
"image": album["image"][0]["#text"],
}

top_albums = sorted(
album_map.items(), key=lambda x: x[1]["score"], reverse=True
Expand All @@ -1431,13 +1449,216 @@ async def server_topalbums(
ctx,
rows,
f"Top 100 Tracks ({timeframe.display()})",
image=await self.api.get_artist_image(
top_albums[0][0].split(" — ")[0].strip("*")
),
image=LastFmImage.from_url(top_albums[0][1]["image"]),
footer=f"Score calculated from top 100 albums of {len(data)} members",
server_target=True,
)

@server.command(
name="chart",
aliases=["collage"],
usage=(
"[timeframe] [[width]x[height]] "
"['album' | 'artist' | 'recent' | 'notitle' | 'topster' | 'padded']"
),
)
async def server_chart(
self,
ctx: MisoContext,
*args: Union[
Annotated[Period, PeriodArgument],
Annotated[ChartSize, ChartSizeArgument],
ChartOption,
],
):
"""
Collage of your top albums or artists
Usage:
>fm server chart [timeframe] [[width]x[height]]
['album' | 'artist' | 'recent' | 'notitle' | 'topster' | 'padded']"
Defaults to 3x3 weekly albums chart.
"""
timeframe = Period.WEEK
size = ChartSize(3, 3)
for arg in args:
if isinstance(arg, Period):
timeframe = arg
elif isinstance(arg, ChartSize):
if arg.width > 12 or arg.height > 12:
raise exceptions.CommandWarning(
"The maximum width/height of the collage is `12`"
)
size = arg

chart_nodes = []
topster_labels = []
topster = "topster" in args
if "artist" in args:
chart_title = "top artist"
data = await self.task_for_each_server_member(
ctx.guild,
self.api.user_get_top_artists,
limit=size.count,
period=timeframe,
)

if data is None:
return await ctx.send(
"Nobody on this server has connected their Last.fm account yet!"
)

artist_map = {}
for member_data, member in data:
artists = member_data["artist"]
lowest_playcount = int(artists[-1]["playcount"])
highest_playcount = int(artists[0]["playcount"])
for artist in artists:
playcount = int(artist["playcount"])
score = playcount_mapped(
playcount,
input_start=lowest_playcount,
input_end=highest_playcount,
)

name = artist["name"]

try:
artist_map[name]["score"] += score
artist_map[name]["playcount"] += playcount
except KeyError:
artist_map[name] = {"score": score, "playcount": playcount}

top_artists = sorted(
artist_map.items(), key=lambda x: x[1]["score"], reverse=True
)[: size.count]

for i, (name, data) in enumerate(top_artists):
chart_nodes.append(
(
await self.api.get_artist_image(name),
f"<strong>{name}</strong></br>{data['score'] / len(data):.2f}%",
)
)
if topster:
if i > 0 and i % size.width == 0:
topster_labels.append(dict(text="</br>"))
topster_labels.append(dict(text=f"<li>{i+1}. {name}</li>"))

elif "recent" in args or "recents" in args:
chart_title = "recent tracks"

data = await self.task_for_each_server_member(
ctx.guild, self.api.user_get_recent_tracks, limit=size.count
)

if data is None:
return await ctx.send(
"Nobody on this server has connected their Last.fm account yet!"
)

track_list = []
for member_data, member in data:
tracks = member_data["track"]
for track in tracks:
artist_name = track["artist"]["#text"]
track_name = track["name"]
image = track["image"][0]["#text"]
try:
when = int(track["date"]["uts"])
except KeyError:
when = sys.maxsize
track_list.append([when, artist_name, track_name, image])

for i, (when, artist_name, track_name, image) in enumerate(
sorted(track_list, key=lambda x: x[0], reverse=True)[: size.count]
):
chart_nodes.append(
(
LastFmImage.from_url(image),
f"<strong>{track_name}</br>{artist_name}</strong>",
)
)
if topster:
if i > 0 and i % size.width == 0:
topster_labels.append(dict(text="</br>"))
topster_labels.append(
dict(text=f"<li>{i+1}. {artist_name}{track_name}</li>")
)

else:
chart_title = "top album"

data = await self.task_for_each_server_member(
ctx.guild, self.api.user_get_top_albums, limit=100, period=timeframe
)

if data is None:
return await ctx.send(
"Nobody on this server has connected their Last.fm account yet!"
)

album_map = {}
for member_data, member in data:
albums = member_data["album"]
lowest_playcount = int(albums[-1]["playcount"])
highest_playcount = int(albums[0]["playcount"])
for album in albums:
playcount = int(album["playcount"])
score = playcount_mapped(
playcount,
input_start=lowest_playcount,
input_end=highest_playcount,
)

name = f"{album['artist']['name']}{album['name']}"
try:
album_map[name]["score"] += score
album_map[name]["playcount"] += playcount
except KeyError:
album_map[name] = {
"score": score,
"playcount": playcount,
"image": album["image"][0]["#text"],
}

top_albums = sorted(
album_map.items(), key=lambda x: x[1]["score"], reverse=True
)[: size.count]

for i, (name, data) in enumerate(top_albums):
chart_nodes.append(
(
LastFmImage.from_url(data["image"]),
f"<strong>{name}</strong></br>{data['score'] / len(data):.2f}%",
)
)
if topster:
if i > 0 and i % size.width == 0:
topster_labels.append(dict(text="</br>"))
topster_labels.append(dict(text=f"<li>{i+1}. {name}</li>"))

buffer = await self.chart_factory(
chart_nodes,
size,
hide_labels="notitle" in args or topster,
use_padding="padded" in args,
topster_labels=topster_labels,
)

caption = (
f"**{ctx.guild.name} | "
f"{f'{timeframe.display().lower()} ' if 'recent' not in args else ''}"
f"{size} {chart_title} collage**"
)
filename = (
f"miso_collage_{ctx.guild.name}_"
f"{timeframe}_{arrow.now().int_timestamp}.jpg"
)

await ctx.send(caption, file=discord.File(fp=buffer, filename=filename))

async def user_ranking(
self, ctx: MisoContext, playcount_fn: Callable, ranking_of: str
):
Expand Down

0 comments on commit 9fbfbdd

Please sign in to comment.