diff --git a/cogs/fm.py b/cogs/fm.py index e8db9f6..8681de7 100644 --- a/cogs/fm.py +++ b/cogs/fm.py @@ -7,6 +7,7 @@ import math import os import re +import sys import urllib.parse from dataclasses import dataclass from enum import Enum, auto @@ -208,6 +209,11 @@ def __init__(self, bot): self.bot: MisoBot = bot self.api = LastFmApi(bot) + async def cog_load(self): + await self.api.login( + self.bot.keychain.LASTFM_USERNAME, self.bot.keychain.LASTFM_PASSWORD + ) + @commands.group(aliases=["lastfm", "lfm", "lf"]) async def fm(self, ctx: MisoContext): """Interact with LastFM using your linked account""" @@ -584,7 +590,7 @@ async def recent(self, ctx: MisoContext): try: timestamp = f'' except KeyError: - timestamp = discord.utils.format_dt(arrow.now().datetime, "R") + timestamp = ":notes:" rows.append(f'**{track["artist"]["#text"]} — {track["name"]}** {timestamp}') await self.paginated_user_stat_embed( @@ -674,7 +680,7 @@ async def artist_overview(self, ctx: MisoContext, timeframe: Period, artist: str f"https://last.fm/user/{ctx.lfm.username}/library/music/" f"{artist_url_format}?date_preset={timeframe.web_format()}" ) - soup = await self.api.scrape_page(url, authenticated=True) + soup = await self.api.scrape_page(url) try: chartlists = soup.select(".chartlist") @@ -805,7 +811,7 @@ async def artist_top( f"https://last.fm/user/{ctx.lfm.username}/library/music/" f"{artist_url_format}/+{data_type}?date_preset={timeframe.web_format()}" ) - soup = await self.api.scrape_page(url, authenticated=True) + soup = await self.api.scrape_page(url) formatted_name = artistinfo["name"] row_items = self.api.get_library_playcounts(soup) @@ -1221,7 +1227,7 @@ async def server_nowplaying(self, ctx: MisoContext): async def server_recent(self, ctx: MisoContext): """What this server has recently listened to""" data = await self.task_for_each_server_member( - ctx.guild, self.api.user_get_recent_tracks + ctx.guild, self.api.user_get_recent_tracks, limit=10 ) if data is None: @@ -1229,32 +1235,38 @@ async def server_recent(self, ctx: MisoContext): "Nobody on this server has connected their Last.fm account yet!" ) - rows = [] + track_list = [] for member_data, member in data: - suffix = "" - if member_data["nowplaying"]: - suffix = " :musical_note:" + tracks = member_data["track"] + for track in tracks: + artist_name = track["artist"]["#text"] + track_name = track["name"] + try: + when = int(track["date"]["uts"]) + except KeyError: + when = sys.maxsize - artist_name = member_data["artist"]["#text"] - track_name = member_data["name"] + track_list.append([when, artist_name, track_name, member]) + if not track_list: + return await ctx.send("Nobody on this server has listened to anything!") + + track_list = sorted(track_list, key=lambda x: x[0], reverse=True) + + rows = [] + for when, artist_name, track_name, member in track_list: + if when == sys.maxsize: + suffix = ":notes:" + else: + suffix = f"" rows.append( f"{util.displayname(member)} | **{escape_markdown(artist_name)}** " - f"— ***{escape_markdown(track_name)}***{suffix}" + f"— ***{escape_markdown(track_name)}*** {suffix}" ) - if not rows: - return await ctx.send("Nobody on this server has listened to anything!") - - content = ( - discord.Embed(color=int(self.LASTFM_RED, 16)) - .set_author( - name=f"What has {ctx.guild.name} been listening to?", - icon_url=ctx.guild.icon, - ) - .set_footer( - text=f"{len(rows)} / {len(data)} Members are listening to music right now" - ) + content = discord.Embed(color=int(self.LASTFM_RED, 16)).set_author( + name=f"What has {ctx.guild.name} been listening to?", + icon_url=ctx.guild.icon, ) await RowPaginator(content, rows).run(ctx) @@ -1353,7 +1365,11 @@ async def server_toptracks( track_map[name]["score"] += score track_map[name]["playcount"] += playcount except KeyError: - track_map[name] = {"score": score, "playcount": playcount} + track_map[name] = { + "score": score, + "playcount": playcount, + "url": track["url"], + } top_tracks = sorted( track_map.items(), key=lambda x: x[1]["score"], reverse=True @@ -1370,9 +1386,7 @@ async def server_toptracks( ctx, rows, f"Top 100 Tracks ({timeframe.display()})", - image=await self.api.get_artist_image( - top_tracks[0][0].split(" — ")[0].strip("*") - ), + image=await self.api.scrape_track_image(top_tracks[0][1]["url"]), footer=f"Score calculated from top 100 tracks of {len(data)} members", server_target=True, ) @@ -1414,7 +1428,11 @@ async def server_topalbums( album_map[name]["score"] += score album_map[name]["playcount"] += playcount except KeyError: - album_map[name] = {"score": score, "playcount": playcount} + album_map[name] = { + "score": score, + "playcount": playcount, + "image": album["image"][0]["#text"], + } top_albums = sorted( album_map.items(), key=lambda x: x[1]["score"], reverse=True @@ -1431,13 +1449,216 @@ async def server_topalbums( ctx, rows, f"Top 100 Tracks ({timeframe.display()})", - image=await self.api.get_artist_image( - top_albums[0][0].split(" — ")[0].strip("*") - ), + image=LastFmImage.from_url(top_albums[0][1]["image"]), footer=f"Score calculated from top 100 albums of {len(data)} members", server_target=True, ) + @server.command( + name="chart", + aliases=["collage"], + usage=( + "[timeframe] [[width]x[height]] " + "['album' | 'artist' | 'recent' | 'notitle' | 'topster' | 'padded']" + ), + ) + async def server_chart( + self, + ctx: MisoContext, + *args: Union[ + Annotated[Period, PeriodArgument], + Annotated[ChartSize, ChartSizeArgument], + ChartOption, + ], + ): + """ + Collage of your top albums or artists + + Usage: + >fm server chart [timeframe] [[width]x[height]] + ['album' | 'artist' | 'recent' | 'notitle' | 'topster' | 'padded']" + + Defaults to 3x3 weekly albums chart. + """ + timeframe = Period.WEEK + size = ChartSize(3, 3) + for arg in args: + if isinstance(arg, Period): + timeframe = arg + elif isinstance(arg, ChartSize): + if arg.width > 12 or arg.height > 12: + raise exceptions.CommandWarning( + "The maximum width/height of the collage is `12`" + ) + size = arg + + chart_nodes = [] + topster_labels = [] + topster = "topster" in args + if "artist" in args: + chart_title = "top artist" + data = await self.task_for_each_server_member( + ctx.guild, + self.api.user_get_top_artists, + limit=size.count, + period=timeframe, + ) + + if data is None: + return await ctx.send( + "Nobody on this server has connected their Last.fm account yet!" + ) + + artist_map = {} + for member_data, member in data: + artists = member_data["artist"] + lowest_playcount = int(artists[-1]["playcount"]) + highest_playcount = int(artists[0]["playcount"]) + for artist in artists: + playcount = int(artist["playcount"]) + score = playcount_mapped( + playcount, + input_start=lowest_playcount, + input_end=highest_playcount, + ) + + name = artist["name"] + + try: + artist_map[name]["score"] += score + artist_map[name]["playcount"] += playcount + except KeyError: + artist_map[name] = {"score": score, "playcount": playcount} + + top_artists = sorted( + artist_map.items(), key=lambda x: x[1]["score"], reverse=True + )[: size.count] + + for i, (name, data) in enumerate(top_artists): + chart_nodes.append( + ( + await self.api.get_artist_image(name), + f"{name}
{data['score'] / len(data):.2f}%", + ) + ) + if topster: + if i > 0 and i % size.width == 0: + topster_labels.append(dict(text="
")) + topster_labels.append(dict(text=f"
  • {i+1}. {name}
  • ")) + + elif "recent" in args or "recents" in args: + chart_title = "recent tracks" + + data = await self.task_for_each_server_member( + ctx.guild, self.api.user_get_recent_tracks, limit=size.count + ) + + if data is None: + return await ctx.send( + "Nobody on this server has connected their Last.fm account yet!" + ) + + track_list = [] + for member_data, member in data: + tracks = member_data["track"] + for track in tracks: + artist_name = track["artist"]["#text"] + track_name = track["name"] + image = track["image"][0]["#text"] + try: + when = int(track["date"]["uts"]) + except KeyError: + when = sys.maxsize + track_list.append([when, artist_name, track_name, image]) + + for i, (when, artist_name, track_name, image) in enumerate( + sorted(track_list, key=lambda x: x[0], reverse=True)[: size.count] + ): + chart_nodes.append( + ( + LastFmImage.from_url(image), + f"{track_name}
    {artist_name}
    ", + ) + ) + if topster: + if i > 0 and i % size.width == 0: + topster_labels.append(dict(text="
    ")) + topster_labels.append( + dict(text=f"
  • {i+1}. {artist_name} — {track_name}
  • ") + ) + + else: + chart_title = "top album" + + data = await self.task_for_each_server_member( + ctx.guild, self.api.user_get_top_albums, limit=100, period=timeframe + ) + + if data is None: + return await ctx.send( + "Nobody on this server has connected their Last.fm account yet!" + ) + + album_map = {} + for member_data, member in data: + albums = member_data["album"] + lowest_playcount = int(albums[-1]["playcount"]) + highest_playcount = int(albums[0]["playcount"]) + for album in albums: + playcount = int(album["playcount"]) + score = playcount_mapped( + playcount, + input_start=lowest_playcount, + input_end=highest_playcount, + ) + + name = f"{album['artist']['name']} — {album['name']}" + try: + album_map[name]["score"] += score + album_map[name]["playcount"] += playcount + except KeyError: + album_map[name] = { + "score": score, + "playcount": playcount, + "image": album["image"][0]["#text"], + } + + top_albums = sorted( + album_map.items(), key=lambda x: x[1]["score"], reverse=True + )[: size.count] + + for i, (name, data) in enumerate(top_albums): + chart_nodes.append( + ( + LastFmImage.from_url(data["image"]), + f"{name}
    {data['score'] / len(data):.2f}%", + ) + ) + if topster: + if i > 0 and i % size.width == 0: + topster_labels.append(dict(text="
    ")) + topster_labels.append(dict(text=f"
  • {i+1}. {name}
  • ")) + + buffer = await self.chart_factory( + chart_nodes, + size, + hide_labels="notitle" in args or topster, + use_padding="padded" in args, + topster_labels=topster_labels, + ) + + caption = ( + f"**{ctx.guild.name} | " + f"{f'{timeframe.display().lower()} ' if 'recent' not in args else ''}" + f"{size} {chart_title} collage**" + ) + filename = ( + f"miso_collage_{ctx.guild.name}_" + f"{timeframe}_{arrow.now().int_timestamp}.jpg" + ) + + await ctx.send(caption, file=discord.File(fp=buffer, filename=filename)) + async def user_ranking( self, ctx: MisoContext, playcount_fn: Callable, ranking_of: str ):