|
| 1 | +# Copyright (c) 2024 iiPython |
| 2 | + |
| 3 | +# Modules |
| 4 | +import typing |
| 5 | +from base64 import b64encode |
| 6 | +from urllib.parse import quote_plus |
| 7 | +from platform import python_version |
| 8 | + |
| 9 | +from requests import Session |
| 10 | + |
| 11 | +from nightwatch import __version__ |
| 12 | +from nightwatch.bot import Client, Context |
| 13 | +from nightwatch.bot.client import AuthorizationFailed |
| 14 | + |
| 15 | +# Handle now playing |
| 16 | +session = Session() |
| 17 | + |
| 18 | +def get_spotify_access() -> str: |
| 19 | + with session.post( |
| 20 | + "https://accounts.spotify.com/api/token", |
| 21 | + data = "grant_type=client_credentials", |
| 22 | + headers = { |
| 23 | + "Authorization": f"Basic {b64encode(b'3f974573800a4ff5b325de9795b8e603:ff188d2860ff44baa57acc79c121a3b9').decode()}", |
| 24 | + "Content-Type": "application/x-www-form-urlencoded" |
| 25 | + } |
| 26 | + ) as response: |
| 27 | + return response.json()["access_token"] |
| 28 | + |
| 29 | +def get_now_playing() -> tuple[dict | None, str | None]: |
| 30 | + with session.get("https://api.listenbrainz.org/1/user/iiPython/playing-now") as response: |
| 31 | + result = (response.json())["payload"]["listens"] |
| 32 | + |
| 33 | + if not result: |
| 34 | + return None, None |
| 35 | + |
| 36 | + result = result[0] |
| 37 | + |
| 38 | + # Reorganize the result data |
| 39 | + tm = result["track_metadata"] |
| 40 | + result = {"artist": tm["artist_name"], "track": tm["track_name"], "album": tm["release_name"],} |
| 41 | + with session.get( |
| 42 | + f"https://api.spotify.com/v1/search?q={quote_plus(f'{result['artist']} {result['album']}')}&type=album&limit=1", |
| 43 | + headers = { |
| 44 | + "Authorization": f"Bearer {get_spotify_access()}", |
| 45 | + "Content-Type": "application/x-www-form-urlencoded" |
| 46 | + } |
| 47 | + ) as response: |
| 48 | + images = (response.json())["albums"]["items"] |
| 49 | + return result, images[0]["images"][-1]["url"] |
| 50 | + |
| 51 | +# Helping methods |
| 52 | +def dominant_color(hex: str) -> str: |
| 53 | + r, g, b = tuple(int(hex[i:i + 2], 16) for i in (0, 2, 4)) |
| 54 | + if r > g and r > b: |
| 55 | + return "red(ish)" |
| 56 | + |
| 57 | + elif g > r and g > b: |
| 58 | + return "green(ish)" |
| 59 | + |
| 60 | + elif b > r and b > g: |
| 61 | + return "blue(ish)" |
| 62 | + |
| 63 | + elif r == g and r > b: |
| 64 | + return "yellow(ish)" |
| 65 | + |
| 66 | + elif r == b and r > g: |
| 67 | + return "magenta(ish)" |
| 68 | + |
| 69 | + elif g == b and g > r: |
| 70 | + return "cyan(ish)" |
| 71 | + |
| 72 | + return "gray(ish)" |
| 73 | + |
| 74 | +# Create client |
| 75 | +class NextgenerationBot(Client): |
| 76 | + def __init__(self) -> None: |
| 77 | + super().__init__() |
| 78 | + |
| 79 | + # Extra data |
| 80 | + self.send_on_join = None |
| 81 | + |
| 82 | + async def rejoin(self, username: typing.Optional[str] = None, hex: typing.Optional[str] = None) -> None: |
| 83 | + await self.close() |
| 84 | + await self.event_loop(username or self.user.name, hex or self.user.hex, self.address) # type: ignore |
| 85 | + |
| 86 | + async def on_connect(self, ctx: Context) -> None: |
| 87 | + print(f"Connected to '{ctx.rics.name}'!") |
| 88 | + |
| 89 | + async def on_message(self, ctx: Context) -> None: |
| 90 | + if self.send_on_join is not None: |
| 91 | + await ctx.send(self.send_on_join) |
| 92 | + self.send_on_join = None |
| 93 | + return |
| 94 | + |
| 95 | + command = ctx.message.message |
| 96 | + if command[0] != "/": |
| 97 | + return |
| 98 | + |
| 99 | + match command[1:].split(" "): |
| 100 | + case ["help"]: |
| 101 | + await ctx.reply("Commands: /help, /music, /user, /people, /rename, /set-hex, /version") |
| 102 | + |
| 103 | + case ["music"]: |
| 104 | + data, image = get_now_playing() |
| 105 | + if not (data and image): |
| 106 | + return await ctx.reply("iiPython isn't listening to anything right now.") |
| 107 | + |
| 108 | + await ctx.send(f"iiPython is listening to {data['track']} by {data['artist']} (on {data['album']}).") |
| 109 | + await ctx.send(f"![{data['track']} by {data['artist']} cover art]({image})") |
| 110 | + |
| 111 | + case ["user", *username]: |
| 112 | + client = next(filter(lambda u: u.name == " ".join(username), ctx.rics.users), None) |
| 113 | + if client is None: |
| 114 | + return await ctx.reply("Specified user doesn't *fucking* exist.") |
| 115 | + |
| 116 | + await ctx.send(f"**Name:** {'🤖 ' if client.bot else '★ ' if client.admin else ''}{client.name} | **HEX Code:** #{client.hex} [{dominant_color(client.hex)}]") |
| 117 | + |
| 118 | + case ["rename" | "set-hex" as command, *response]: |
| 119 | + try: |
| 120 | + await self.rejoin( |
| 121 | + " ".join(response) if command == "rename" else None, |
| 122 | + response[0] if command == "set-hex" else None |
| 123 | + ) |
| 124 | + |
| 125 | + except AuthorizationFailed as problem: |
| 126 | + if problem.json is not None: |
| 127 | + message = (problem.json.get("message") or problem.json["detail"][0]["msg"]).rstrip(".").lower() |
| 128 | + self.send_on_join = f"Failed to switch {'username' if command == 'rename' else 'hex code'} because '{message}'." |
| 129 | + |
| 130 | + await self.event_loop(self.user.name, self.user.hex, self.address) # type: ignore |
| 131 | + |
| 132 | + case ["people"]: |
| 133 | + await ctx.send(f"There are {len(ctx.rics.users)} users: {', '.join(f'{u.name}{f' ({'admin' if u.admin else 'bot'})' if u.admin or u.bot else ''}' for u in ctx.rics.users)}") |
| 134 | + |
| 135 | + case ["version"]: |
| 136 | + await ctx.reply(f"Running on Nightwatch v{__version__} using Python {python_version()}.") |
| 137 | + |
| 138 | + case _: |
| 139 | + await ctx.reply("I have **no idea** what the *fuck* you just asked...") |
| 140 | + |
| 141 | +NextgenerationBot().run( |
| 142 | + username = "Pizza Eater", |
| 143 | + hex = "ff0000", |
| 144 | + address = "nightwatch.k4ffu.dev" |
| 145 | +) |
0 commit comments