-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathtrojan_manager.py
executable file
·399 lines (359 loc) · 13 KB
/
trojan_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Name: Trojan Manager
Dev: K4YT3X
Date Created: July 8, 2018
Last Modified: March 5, 2019
Licensed under the GNU General Public License Version 3 (GNU GPL v3),
available at: https://www.gnu.org/licenses/gpl-3.0.txt
Copyright (C) 2018-2019, K4YT3X <[email protected]>
"""
from avalon_framework import Avalon
from prettytable import PrettyTable
import hashlib
import MySQLdb
import readline
import sys
import traceback
VERSION = '1.3.7'
COMMANDS = [
"CreateUserTable",
"TruncateUserTable",
"Verify",
"AddUser",
"DelUser",
"Show",
"SetQuota",
"AddQuota",
"ClearUsage",
"Exit",
"Quit",
]
def show_affection(function):
""" Shows cursor execution affected rows
"""
def wrapper(*args, **kwargs):
function(*args, **kwargs)
Avalon.debug_info('{} row(s) affected'.format(args[0].cursor.rowcount))
return wrapper
def catch_mysql_errors(function):
""" Catch mysqldb warnings and errors
"""
def wrapper(*args, **kwargs):
try:
function(*args, **kwargs)
except (MySQLdb.Error, MySQLdb.Warning) as e:
Avalon.error(e)
return 1
return wrapper
class ShellCompleter(object):
""" Completer for readline
This class tries to match input text
to a list of available commands.
"""
def __init__(self, options):
self.options = sorted(options)
def complete(self, text, state):
if state == 0:
if text:
self.matches = [s for s in self.options if s and s.lower().startswith(text.lower())]
else:
self.matches = self.options[:]
try:
return self.matches[state]
except IndexError:
return None
class TrojanDatabase:
""" Trojan Database Connector
This class generates objects that
connects to and controls the trojan
database.
"""
def __init__(self, db_host, db_user, db_pass, db, table):
""" Initialize database connection
"""
self.db_host = db_host
self.db_user = db_user
self.db_pass = db_pass
self.db = db
self.table = table
self.connection = MySQLdb.connect(self.db_host, self.db_user, self.db_pass, self.db)
self.cursor = self.connection.cursor()
@show_affection
@catch_mysql_errors
def create_user_table(self):
""" Create new user table
"""
sql = ["CREATE TABLE {} (".format(self.table),
"id INT UNSIGNED NOT NULL AUTO_INCREMENT,",
"username VARCHAR(64) NOT NULL,",
"password CHAR(56) NOT NULL,",
"quota BIGINT NOT NULL DEFAULT 0,",
"download BIGINT UNSIGNED NOT NULL DEFAULT 0,",
"upload BIGINT UNSIGNED NOT NULL DEFAULT 0,",
"PRIMARY KEY (id),",
"INDEX (password)",
");",
]
self.cursor.execute(''.join(sql))
self.connection.commit()
return 0
@show_affection
@catch_mysql_errors
def truncate_user_table(self):
""" truncate the user table
"""
self.cursor.execute('TRUNCATE {};'.format(self.table))
self.connection.commit()
return 0
@show_affection
@catch_mysql_errors
def drop_user_table(self):
""" Boom
"""
self.cursor.execute('DROP TABLE {};'.format(self.table))
self.connection.commit()
return 0
@show_affection
@catch_mysql_errors
def add_user(self, username, password):
""" Add new user into database
"""
self.cursor.execute("SELECT * FROM {} WHERE username = '{}'".format(self.table, username))
if self.cursor.fetchone() is not None:
Avalon.error('User {} already exists'.format(username))
self.cursor.rowcount = 0 # No actual changes to database
return 1
fullhash = hashlib.sha224('{}:{}'.format(username, password).encode('utf-8')).hexdigest()
self.cursor.execute("INSERT INTO {} (username, password) VALUES ('{}', '{}')".format(self.table, username, fullhash))
# self.cursor.execute("INSERT INTO {} SHA2(CONCAT(username, ':', password)', 224) VALUES ('{}', '{}')".format(self.table, username, password))
# self.connection.commit()
return 0
@show_affection
@catch_mysql_errors
def del_user(self, username):
""" Delete a user from the database
"""
self.cursor.execute("DELETE FROM {} WHERE username = '{}'".format(self.table, username))
self.connection.commit()
return 0
@catch_mysql_errors
def user_exists(self, username):
""" Determines if a user exists in the database
"""
self.cursor.execute("SELECT * FROM {} WHERE username = '{}'".format(self.table, username))
user_id = self.cursor.fetchone()
if user_id is not None:
return True
return False
@catch_mysql_errors
def show_users(self, show_quota=False):
""" List users
If show_quota is True, include quota and data usage
"""
total_users = self.cursor.execute("SELECT * FROM {}".format(self.table))
columns = ['ID', 'Username', 'Password']
if show_quota:
columns += ['Quota', 'Download', 'Upload']
table = PrettyTable(columns)
for user in self.cursor.fetchall():
if show_quota:
table.add_row(user)
else:
table.add_row([user[0], user[1], user[2]])
print(table)
Avalon.info('Query complete, {} user(s) found in database'.format(total_users))
return 0
def convert_units(self, data):
""" Convert data unit into bytes
"""
try:
try:
return int(data)
except ValueError:
if data[-1].lower() == 'k':
return int(data[:-1]) * 1024
elif data[-1].lower() == 'm':
return int(data[:-1]) * 1024 ** 2
elif data[-1].lower() == 'g':
return int(data[:-1]) * 1024 ** 3
elif data[-1].lower() == 't':
return int(data[:-1]) * 1024 ** 4
elif data[-1].lower() == 'p':
return int(data[:-1]) * 1024 ** 5
else:
return False
except ValueError:
return False
@show_affection
@catch_mysql_errors
def set_quota(self, username, quota):
""" Set user quota to a specific value
"""
converted = self.convert_units(quota)
if not converted:
Avalon.error('Invalid quota input')
return 1
self.cursor.execute("UPDATE {} SET quota = {} WHERE username = '{}'".format(self.table, converted, username))
self.connection.commit()
return 0
@show_affection
@catch_mysql_errors
def add_quota(self, username, appended_quota):
""" Append quota
"""
converted = self.convert_units(appended_quota)
if not converted:
Avalon.error('Invalid quota input')
return 1
self.cursor.execute("UPDATE {} SET quota = quota + {} WHERE username = '{}'".format(self.table, converted, username))
self.connection.commit()
return 0
@show_affection
@catch_mysql_errors
def clear_usage(self, username):
""" Clear user data usage
"""
self.cursor.execute("UPDATE {} SET download = 0, upload = 0 WHERE username = '{}'".format(self.table, username))
return 0
@catch_mysql_errors
def verify(self, fullhash):
""" Verify if user credentials are valid
"""
valid_hashes = []
self.cursor.execute("SELECT * FROM {}".format(self.table))
all_users = self.cursor.fetchall()
for user in all_users:
valid_hashes.append(user[2])
if fullhash in valid_hashes:
Avalon.info('Valid user')
return 0
Avalon.warning('Invalid user')
return 1
def print_legal_info():
print('Trojan Manager {}'.format(VERSION))
print('(C) 2018 K4YT3X')
print('Licensed under GNU GPL v3')
def print_help():
help_lines = [
"\n{}Commands are not case-sensitive{}".format(Avalon.FM.BD, Avalon.FM.RST),
"CreateUserTable",
"TruncateUserTable",
"Verify [hash]",
"AddUser [username] [password]",
"DelUser [username]",
"Show (users / quota)",
"SetQuota [quota]",
"AddQuota [quota]",
"ClearUsage [username]",
"Interactive / int"
"Exit / Quit",
"",
]
for line in help_lines:
print(line)
def command_interpreter(db_connection, commands):
""" Trojan shell command interpreter
"""
try:
# Try to guess what the user is saying
possibilities = [s for s in COMMANDS if s.lower().startswith(commands[1])]
if len(possibilities) == 1:
commands[1] = possibilities[0]
if commands[1].replace(' ', '') == '':
result = 0
elif commands[1].lower() == 'help':
print_help()
result = 0
elif commands[1].lower() == 'createusertable':
result = db_connection.create_user_table()
elif commands[1].lower() == 'truncateusertable':
Avalon.warning('By truncating you will LOSE ALL USER DATA')
if Avalon.ask('Are you sure you want to truncate?'):
result = db_connection.truncate_user_table()
else:
Avalon.warning('Operation canceled')
result = 0
elif commands[1].lower() == 'dropusertable':
Avalon.warning('By dropping the table you will LOSE ALL USER DATA')
if Avalon.ask('Are you sure you want to drop the table?'):
result = db_connection.drop_user_table()
else:
Avalon.warning('Operation canceled')
result = 0
elif commands[1].lower() == 'verify':
result = db_connection.verify(commands[2])
elif commands[1].lower() == 'adduser':
result = db_connection.add_user(commands[2], commands[3])
elif commands[1].lower() == 'deluser':
result = db_connection.del_user(commands[2])
elif commands[1].lower() == 'show':
if commands[2].lower() == 'users':
result = db_connection.show_users()
elif commands[2].lower() == 'quota':
result = db_connection.show_users(show_quota=True)
elif commands[1].lower() == 'setquota':
result = db_connection.set_quota(commands[2], commands[3])
elif commands[1].lower() == 'addquota':
result = db_connection.add_quota(commands[2], commands[3])
elif commands[1].lower() == 'clearusage':
result = db_connection.clear_usage(commands[2])
elif commands[1].lower() == 'exit' or commands[1].lower() == 'quit':
Avalon.warning('Exiting')
exit(0)
elif len(possibilities) > 0:
Avalon.warning('Ambiguous command \"{}\"'.format(commands[1]))
print('Use \"Help\" command to list available commands')
result = 1
else:
Avalon.error('Invalid command')
print('Use \"Help\" command to list available commands')
result = 1
return result
except IndexError:
Avalon.error('Invalid arguments')
print('Use \"Help\" command to list available commands')
result = 0
def main():
""" Trojan Manager main function
This function can only be executed when
this file is not being imported.
"""
# Create database controller connection
try:
trojan_db = TrojanDatabase('127.0.0.1', 'trojan', 'thisisthetrojandbpassword', 'trojan', 'users')
except (MySQLdb.OperationalError) as e:
Avalon.error('Error establishing connection to MySQL/MariaDB')
Avalon.error('Please check your settings')
traceback.print_exc()
exit(1)
# Begin command interpreting
try:
if sys.argv[1].lower() == 'interactive' or sys.argv[1].lower() == 'int':
print_legal_info()
# Set command completer
completer = ShellCompleter(COMMANDS)
readline.set_completer(completer.complete)
readline.parse_and_bind('tab: complete')
# Launch interactive trojan shell
prompt = '{}[trojan]> {}'.format(Avalon.FM.BD, Avalon.FM.RST)
while True:
command_interpreter(trojan_db, [''] + input(prompt).split(' '))
else:
# Return to shell with command return value
exit(command_interpreter(trojan_db, sys.argv[0:]))
except IndexError:
Avalon.warning('No commands specified')
exit(0)
except (KeyboardInterrupt, EOFError):
Avalon.warning('Exiting')
exit(0)
except Exception:
Avalon.error('Exception caught')
traceback.print_exc()
exit(1)
if __name__ == '__main__':
main()
else:
Avalon.warning('This file cannot be imported')