-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathMarkovChainBot.py
570 lines (478 loc) · 26.6 KB
/
MarkovChainBot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
from typing import List, Tuple
from TwitchWebsocket import Message, TwitchWebsocket
from nltk.tokenize import sent_tokenize
import socket, time, logging, re, string
from Settings import Settings, SettingsData
from Database import Database
from Timer import LoopingTimer
from Tokenizer import detokenize, tokenize
from Log import Log
Log(__file__)
logger = logging.getLogger(__name__)
class MarkovChain:
def __init__(self):
self.prev_message_t = 0
self._enabled = True
# This regex should detect similar phrases as links as Twitch does
self.link_regex = re.compile("\w+\.[a-z]{2,}")
# List of moderators used in blacklist modification, includes broadcaster
self.mod_list = []
self.set_blacklist()
# Fill previously initialised variables with data from the settings.txt file
Settings(self)
self.db = Database(self.chan)
# Set up daemon Timer to send help messages
if self.help_message_timer > 0:
if self.help_message_timer < 300:
raise ValueError("Value for \"HelpMessageTimer\" in must be at least 300 seconds, or a negative number for no help messages.")
t = LoopingTimer(self.help_message_timer, self.send_help_message)
t.start()
# Set up daemon Timer to send automatic generation messages
if self.automatic_generation_timer > 0:
if self.automatic_generation_timer < 30:
raise ValueError("Value for \"AutomaticGenerationMessage\" in must be at least 30 seconds, or a negative number for no automatic generations.")
t = LoopingTimer(self.automatic_generation_timer, self.send_automatic_generation_message)
t.start()
self.ws = TwitchWebsocket(host=self.host,
port=self.port,
chan=self.chan,
nick=self.nick,
auth=self.auth,
callback=self.message_handler,
capability=["commands", "tags"],
live=True)
self.ws.start_bot()
def set_settings(self, settings: SettingsData):
"""Fill class instance attributes based on the settings file.
Args:
settings (SettingsData): The settings dict with information from the settings file.
"""
self.host = settings["Host"]
self.port = settings["Port"]
self.chan = settings["Channel"]
self.nick = settings["Nickname"]
self.auth = settings["Authentication"]
self.denied_users = [user.lower() for user in settings["DeniedUsers"]] + [self.nick.lower()]
self.allowed_users = [user.lower() for user in settings["AllowedUsers"]]
self.cooldown = settings["Cooldown"]
self.key_length = settings["KeyLength"]
self.max_sentence_length = settings["MaxSentenceWordAmount"]
self.min_sentence_length = settings["MinSentenceWordAmount"]
self.help_message_timer = settings["HelpMessageTimer"]
self.automatic_generation_timer = settings["AutomaticGenerationTimer"]
self.whisper_cooldown = settings["WhisperCooldown"]
self.enable_generate_command = settings["EnableGenerateCommand"]
self.sent_separator = settings["SentenceSeparator"]
self.allow_generate_params = settings["AllowGenerateParams"]
self.generate_commands = tuple(settings["GenerateCommands"])
def message_handler(self, m: Message):
try:
if m.type == "366":
logger.info(f"Successfully joined channel: #{m.channel}")
# Get the list of mods used for modifying the blacklist
logger.info("Fetching mod list...")
self.ws.send_message("/mods")
elif m.type == "NOTICE":
# Check whether the NOTICE is a response to our /mods request
if m.message.startswith("The moderators of this channel are:"):
string_list = m.message.replace("The moderators of this channel are:", "").strip()
self.mod_list = [m.channel] + string_list.split(", ")
logger.info(f"Fetched mod list. Found {len(self.mod_list) - 1} mods.")
elif m.message == "There are no moderators of this channel.":
self.mod_list = [m.channel]
logger.info(f"Fetched mod list. Found no mods.")
# If it is not, log this NOTICE
else:
logger.info(m.message)
elif m.type in ("PRIVMSG", "WHISPER"):
if m.message.startswith("!enable") and self.check_if_permissions(m):
if self._enabled:
self.ws.send_whisper(m.user, "The generate command is already enabled.")
else:
self.ws.send_whisper(m.user, "Users can now use generate command again.")
self._enabled = True
logger.info("Users can now use generate command again.")
elif m.message.startswith("!disable") and self.check_if_permissions(m):
if self._enabled:
self.ws.send_whisper(m.user, "Users can now no longer use generate command.")
self._enabled = False
logger.info("Users can now no longer use generate command.")
else:
self.ws.send_whisper(m.user, "The generate command is already disabled.")
elif m.message.startswith(("!setcooldown", "!setcd")) and self.check_if_permissions(m):
split_message = m.message.split(" ")
if len(split_message) == 2:
try:
cooldown = int(split_message[1])
except ValueError:
self.ws.send_whisper(m.user, f"The parameter must be an integer amount, eg: !setcd 30")
return
self.cooldown = cooldown
Settings.update_cooldown(cooldown)
self.ws.send_whisper(m.user, f"The !generate cooldown has been set to {cooldown} seconds.")
else:
self.ws.send_whisper(m.user, f"Please add exactly 1 integer parameter, eg: !setcd 30.")
if m.type == "PRIVMSG":
# Ignore bot messages
if m.user.lower() in self.denied_users:
return
if self.check_if_generate(m.message):
if not self.enable_generate_command and not self.check_if_permissions(m):
return
if not self._enabled:
if not self.db.check_whisper_ignore(m.user):
self.send_whisper(m.user, "The !generate has been turned off. !nopm to stop me from whispering you.")
return
cur_time = time.time()
if self.prev_message_t + self.cooldown < cur_time or self.check_if_permissions(m):
if self.check_filter(m.message):
sentence = "You can't make me say that, you madman!"
else:
params = tokenize(m.message)[2:] if self.allow_generate_params else None
# Generate an actual sentence
sentence, success = self.generate(params)
if success:
# Reset cooldown if a message was actually generated
self.prev_message_t = time.time()
logger.info(sentence)
self.ws.send_message(sentence)
else:
if not self.db.check_whisper_ignore(m.user):
self.send_whisper(m.user, f"Cooldown hit: {self.prev_message_t + self.cooldown - cur_time:0.2f} out of {self.cooldown:.0f}s remaining. !nopm to stop these cooldown pm's.")
logger.info(f"Cooldown hit with {self.prev_message_t + self.cooldown - cur_time:0.2f}s remaining.")
return
# Send help message when requested.
elif m.message.startswith(("!ghelp", "!genhelp", "!generatehelp")):
self.send_help_message()
# Ignore the message if it is deemed a command
elif self.check_if_other_command(m.message):
return
# Ignore the message if it contains a link.
elif self.check_link(m.message):
return
if "emotes" in m.tags:
# If the list of emotes contains "emotesv2_", then the message contains a bit emote,
# and we choose not to learn from those messages.
if "emotesv2_" in m.tags["emotes"]:
return
# Replace modified emotes with normal versions,
# as the bot will never have the modified emotes unlocked at the time.
for modifier in self.extract_modifiers(m.tags["emotes"]):
m.message = m.message.replace(modifier, "")
# Ignore the message if any word in the sentence is on the ban filter
if self.check_filter(m.message):
logger.warning(f"Sentence contained blacklisted word or phrase:\"{m.message}\"")
return
else:
# Try to split up sentences. Requires nltk's 'punkt' resource
try:
sentences = sent_tokenize(m.message.strip())
# If 'punkt' is not downloaded, then download it, and retry
except LookupError:
logger.debug("Downloading required punkt resource...")
import nltk
nltk.download('punkt')
logger.debug("Downloaded required punkt resource.")
sentences = sent_tokenize(m.message.strip())
for sentence in sentences:
# Get all seperate words
words = tokenize(sentence)
# Double spaces will lead to invalid rules. We remove empty words here
if "" in words:
words = [word for word in words if word]
# If the sentence is too short, ignore it and move on to the next.
if len(words) <= self.key_length:
continue
# Add a new starting point for a sentence to the <START>
#self.db.add_rule(["<START>"] + [words[x] for x in range(self.key_length)])
self.db.add_start_queue([words[x] for x in range(self.key_length)])
# Create Key variable which will be used as a key in the Dictionary for the grammar
key = list()
for word in words:
# Set up key for first use
if len(key) < self.key_length:
key.append(word)
continue
self.db.add_rule_queue(key + [word])
# Remove the first word, and add the current word,
# so that the key is correct for the next word.
key.pop(0)
key.append(word)
# Add <END> at the end of the sentence
self.db.add_rule_queue(key + ["<END>"])
elif m.type == "WHISPER":
# Allow people to whisper the bot to disable or enable whispers.
if m.message == "!nopm":
logger.debug(f"Adding {m.user} to Do Not Whisper.")
self.db.add_whisper_ignore(m.user)
self.ws.send_whisper(m.user, "You will no longer be sent whispers. Type !yespm to reenable. ")
elif m.message == "!yespm":
logger.debug(f"Removing {m.user} from Do Not Whisper.")
self.db.remove_whisper_ignore(m.user)
self.ws.send_whisper(m.user, "You will again be sent whispers. Type !nopm to disable again. ")
# Note that I add my own username to this list to allow me to manage the
# blacklist in channels of my bot in channels I am not modded in.
# I may modify this and add a "allowed users" field in the settings file.
elif m.user.lower() in self.mod_list + ["cubiedev"] + self.allowed_users:
# Adding to the blacklist
if self.check_if_our_command(m.message, "!blacklist"):
if len(m.message.split()) == 2:
# TODO: Remove newly blacklisted word from the Database
word = m.message.split()[1].lower()
self.blacklist.append(word)
logger.info(f"Added `{word}` to Blacklist.")
self.write_blacklist(self.blacklist)
self.ws.send_whisper(m.user, "Added word to Blacklist.")
else:
self.ws.send_whisper(m.user, "Expected Format: `!blacklist word` to add `word` to the blacklist")
# Removing from the blacklist
elif self.check_if_our_command(m.message, "!whitelist"):
if len(m.message.split()) == 2:
word = m.message.split()[1].lower()
try:
self.blacklist.remove(word)
logger.info(f"Removed `{word}` from Blacklist.")
self.write_blacklist(self.blacklist)
self.ws.send_whisper(m.user, "Removed word from Blacklist.")
except ValueError:
self.ws.send_whisper(m.user, "Word was already not in the blacklist.")
else:
self.ws.send_whisper(m.user, "Expected Format: `!whitelist word` to remove `word` from the blacklist.")
# Checking whether a word is in the blacklist
elif self.check_if_our_command(m.message, "!check"):
if len(m.message.split()) == 2:
word = m.message.split()[1].lower()
if word in self.blacklist:
self.ws.send_whisper(m.user, "This word is in the Blacklist.")
else:
self.ws.send_whisper(m.user, "This word is not in the Blacklist.")
else:
self.ws.send_whisper(m.user, "Expected Format: `!check word` to check whether `word` is on the blacklist.")
elif m.type == "CLEARMSG":
# If a message is deleted, its contents will be unlearned
# or rather, the "occurances" attribute of each combinations of words in the sentence
# is reduced by 5, and deleted if the occurances is now less than 1.
self.db.unlearn(m.message)
# TODO: Think of some efficient way to check whether it was our message that got deleted.
# If the bot's message was deleted, log this as an error
#if m.user.lower() == self.nick.lower():
# logger.error(f"This bot message was deleted: \"{m.message}\"")
except Exception as e:
logger.exception(e)
def generate(self, params: List[str] = None) -> "Tuple[str, bool]":
"""Given an input sentence, generate the remainder of the sentence using the learned data.
Args:
params (List[str]): A list of words to use as an input to use as the start of generating.
Returns:
Tuple[str, bool]: A tuple of a sentence as the first value, and a boolean indicating
whether the generation succeeded as the second value.
"""
if params is None:
params = []
# List of sentences that will be generated. In some cases, multiple sentences will be generated,
# e.g. when the first sentence has less words than self.min_sentence_length.
sentences = [[]]
# Check for commands or recursion, eg: !generate !generate
if len(params) > 0:
if self.check_if_other_command(params[0]):
return "You can't make me do commands, you madman!", False
# Get the starting key and starting sentence.
# If there is more than 1 param, get the last 2 as the key.
# Note that self.key_length is fixed to 2 in this implementation
if len(params) > 1:
key = params[-self.key_length:]
# Copy the entire params for the sentence
sentences[0] = params.copy()
elif len(params) == 1:
# First we try to find if this word was once used as the first word in a sentence:
key = self.db.get_next_single_start(params[0])
if key == None:
# If this failed, we try to find the next word in the grammar as a whole
key = self.db.get_next_single_initial(0, params[0])
if key == None:
# Return a message that this word hasn't been learned yet
return f"I haven't extracted \"{params[0]}\" from chat yet.", False
# Copy this for the sentence
sentences[0] = key.copy()
else: # if there are no params
# Get starting key
key = self.db.get_start()
if key:
# Copy this for the sentence
sentences[0] = key.copy()
else:
# If nothing's ever been said
return "There is not enough learned information yet.", False
# Counter to prevent infinite loops (i.e. constantly generating <END> while below the
# minimum number of words to generate)
i = 0
while self.sentence_length(sentences) < self.max_sentence_length and i < self.max_sentence_length * 2:
# Use key to get next word
if i == 0:
# Prevent fetching <END> on the first word
word = self.db.get_next_initial(i, key)
else:
word = self.db.get_next(i, key)
i += 1
if word == "<END>" or word == None:
# Break, unless we are before the min_sentence_length
if i < self.min_sentence_length:
key = self.db.get_start()
# Ensure that the key can be generated. Otherwise we still stop.
if key:
# Start a new sentence
sentences.append([])
for entry in key:
sentences[-1].append(entry)
continue
break
# Otherwise add the word
sentences[-1].append(word)
# Shift the key so on the next iteration it gets the next item
key.pop(0)
key.append(word)
# If there were params, but the sentence resulting is identical to the params
# Then the params did not result in an actual sentence
# If so, restart without params
if len(params) > 0 and params == sentences[0]:
return "I haven't learned what to do with \"" + detokenize(params[-self.key_length:]) + "\" yet.", False
return self.sent_separator.join(detokenize(sentence) for sentence in sentences), True
def sentence_length(self, sentences: List[List[str]]) -> int:
"""Given a list of tokens representing a sentence, return the number of words in there.
Args:
sentences (List[List[str]]): List of lists of tokens that make up a sentence,
where a token is a word or punctuation. For example:
[['Hello', ',', 'you', "'re", 'Tom', '!'], ['Yes', ',', 'I', 'am', '.']]
This would return 6.
Returns:
int: The number of words in the sentence.
"""
count = 0
for sentence in sentences:
for token in sentence:
if token not in string.punctuation and token[0] != "'":
count += 1
return count
def extract_modifiers(self, emotes: str) -> List[str]:
"""Extract emote modifiers from emotes, such as the the horizontal flip.
Args:
emotes (str): String containing all emotes used in the message.
Returns:
List[str]: List of strings that show modifiers, such as "_HZ" for horizontal flip.
"""
output = []
try:
while emotes:
u_index = emotes.index("_")
c_index = emotes.index(":", u_index)
output.append(emotes[u_index:c_index])
emotes = emotes[c_index:]
except ValueError:
pass
return output
def write_blacklist(self, blacklist: List[str]) -> None:
"""Write blacklist.txt given a list of banned words.
Args:
blacklist (List[str]): The list of banned words to write.
"""
logger.debug("Writing Blacklist...")
with open("blacklist.txt", "w") as f:
f.write("\n".join(sorted(blacklist, key=lambda x: len(x), reverse=True)))
logger.debug("Written Blacklist.")
def set_blacklist(self) -> None:
"""Read blacklist.txt and set `self.blacklist` to the list of banned words."""
logger.debug("Loading Blacklist...")
try:
with open("blacklist.txt", "r") as f:
self.blacklist = [l.replace("\n", "") for l in f.readlines()]
logger.debug("Loaded Blacklist.")
except FileNotFoundError:
logger.warning("Loading Blacklist Failed!")
self.blacklist = ["<start>", "<end>"]
self.write_blacklist(self.blacklist)
def send_help_message(self) -> None:
"""Send a Help message to the connected chat, as long as the bot wasn't disabled."""
if self._enabled:
logger.info("Help message sent.")
try:
self.ws.send_message("Learn how this bot generates sentences here: https://github.com/CubieDev/TwitchMarkovChain#how-it-works")
except socket.OSError as error:
logger.warning(f"[OSError: {error}] upon sending help message. Ignoring.")
def send_automatic_generation_message(self) -> None:
"""Send an automatic generation message to the connected chat.
As long as the bot wasn't disabled, just like if someone typed "!g" in chat.
"""
if self._enabled:
sentence, success = self.generate()
if success:
logger.info(sentence)
# Try to send a message. Just log a warning on fail
try:
self.ws.send_message(sentence)
except socket.OSError as error:
logger.warning(f"[OSError: {error}] upon sending automatic generation message. Ignoring.")
else:
logger.info("Attempted to output automatic generation message, but there is not enough learned information yet.")
def send_whisper(self, user: str, message: str) -> None:
"""Optionally send a whisper, only if "WhisperCooldown" is True.
Args:
user (str): The user to potentially whisper.
message (str): The message to potentially whisper
"""
if self.whisper_cooldown:
self.ws.send_whisper(user, message)
def check_filter(self, message: str) -> bool:
"""Returns True if message contains a banned word.
Args:
message (str): The message to check.
"""
for word in tokenize(message):
if word.lower() in self.blacklist:
return True
return False
def check_if_our_command(self, message: str, *commands: "Tuple[str]") -> bool:
"""True if the first "word" of the message is in the tuple of commands
Args:
message (str): The message to check for a command.
commands (Tuple[str]): A tuple of commands.
Returns:
bool: True if the first word in message is one of the commands.
"""
return message.split()[0] in commands
def check_if_generate(self, message: str) -> bool:
"""True if the first "word" of the message is one of the defined generate commands.
Args:
message (str): The message to check for the generate command (i.e !generate or !g).
Returns:
bool: True if the first word in message is a generate command.
"""
return self.check_if_our_command(message, *self.generate_commands)
def check_if_other_command(self, message: str) -> bool:
"""True if the message is any command, except /me.
Is used to avoid learning and generating commands.
Args:
message (str): The message to check.
Returns:
bool: True if the message is any potential command (starts with a '!', '/' or '.')
with the exception of /me.
"""
return message.startswith(("!", "/", ".")) and not message.startswith("/me")
def check_if_permissions(self, m: Message) -> bool:
"""True if the user has heightened permissions.
E.g. permissions to bypass cooldowns, update settings, disable the bot, etc.
True for the streamer themselves, and the users set as the allowed users.
Args:
m (Message): The Message object that was sent from Twitch.
Has `user` and `channel` attributes.
"""
return m.user == m.channel or m.user in self.allowed_users
def check_link(self, message: str) -> bool:
"""True if `message` contains a link.
Args:
message (str): The message to check for a link.
Returns:
bool: True if the message contains a link.
"""
return self.link_regex.search(message)
if __name__ == "__main__":
MarkovChain()