-
Notifications
You must be signed in to change notification settings - Fork 0
/
main-otp.py
77 lines (69 loc) · 2.84 KB
/
main-otp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
from sqlalchemy.orm import sessionmaker
from functions import *
import argparse, sys, pyotp
import sys
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument("--user", action="store", dest="user", help="username")
parser.add_argument("--code", action="store", dest="code", help="otp code")
parser.add_argument("--generate_reserve_codes", action="store_true", dest="generator", help="this option need for initial generating reserve codes for all users")
args = parser.parse_args()
DBSession = sessionmaker(bind=engine)
session = DBSession()
if args.generator:
generator = args.generator
else:
generator = False
if generator:
all_users = session.query(User.vpn_username)
for username in all_users:
print(username[0])
db_one_time_codes_as_list = generateNewReserveCodes()
try:
new_one_time_codes_field = ','.join([str(elem) for elem in db_one_time_codes_as_list])
session.query(User.one_time_code).filter(User.vpn_username == username[0]).update(
{User.one_time_code: str(new_one_time_codes_field)})
session.flush()
session.commit()
except:
print("can't update one_time_codes for user %s" % username)
print("All user has been updated")
else:
if args.user:
username = args.user
else:
print("can't process without user")
sys.exit(1)
if args.code:
code = args.code
else:
code = ""
db_one_time_codes = session.query(User.one_time_code).filter(User.vpn_username == username).one()
is_2fa_disabled = session.query(User.skip_2fa).filter(User.vpn_username == username).one()
db_one_time_codes_as_list = db_one_time_codes[0].split(",")
if is_2fa_disabled[0] == 0:
try:
db_one_time_codes_as_list.index(code)
db_one_time_codes_as_list.remove(code)
if len(db_one_time_codes_as_list) == 0:
db_one_time_codes_as_list = generateNewReserveCodes()
new_one_time_codes_field = ','.join([str(elem) for elem in db_one_time_codes_as_list])
session.query(User.one_time_code).filter(User.vpn_username == username).update({User.one_time_code: str(new_one_time_codes_field)})
session.flush()
session.commit()
one_time_code_authorized = 1
print("Auth-Type = Accept\n")
except:
one_time_code_authorized = 0
if one_time_code_authorized == 0:
pin = session.query(User.pin).filter(User.vpn_username == username).one()[0]
hotp = pyotp.TOTP(pin)
if hotp.verify(code):
print("Auth-Type = Accept\n")
sys.exit(0)
else:
print("Auth-Type = Denied\n")
sys.exit(1)
else:
print("Auth-Type = Accept\n")
sys.exit(0)