Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Many changes!!! #5

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
key.pem
response.json
trash
trash/*
*.pem
#response.json
trash
trash/*
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
---
title: GET ME LOGGED INTO DUO!
emoji: ÒÓ
sdk: streamlit
app_file: main.py
pinned: false
---

# ruo
Approve Duo Push requests without a phone.

Expand Down
179 changes: 122 additions & 57 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import time
import pathlib
import threading
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA512


import urllib.parse
import io
import base64
import datetime
import email.utils
import json

import requests
import json


class Client:
def __init__(self, akey=None, pkey=None, host=None, code=None, response=None, keyfile=None):
def __init__(self, akey=None, pkey=None, host=None, code=None, response=None, keyfile=None) -> None:
if keyfile:
self.import_key(keyfile)
else:
Expand All @@ -32,13 +31,13 @@ def __init__(self, akey=None, pkey=None, host=None, code=None, response=None, ke
if response:
self.import_response(response)

def __str__(self):
def __str__(self) -> str:
return repr(self)

def __repr__(self) -> str:
return "Client("+",".join([(self.__dict__[i] or '') and (i+'='+self.__dict__[i]) for i in ["akey", "pkey", "host"]])+")"
return "Client(" + ",".join([(self.__dict__[i] or '') and (i + '=' + self.__dict__[i]) for i in ["akey", "pkey", "host"]]) + ")"

def import_key(self, keyfile):
def import_key(self, keyfile) -> None:
if issubclass(type(keyfile), io.IOBase):
self.pubkey = RSA.import_key(keyfile.read())
else:
Expand All @@ -48,22 +47,22 @@ def import_key(self, keyfile):
with open(keyfile, "rb") as f:
self.pubkey = RSA.import_key(f.read())

def export_key(self, file):
def export_key(self, file) -> None:
if type(file) is str:
with open(file, "wb") as f:
f.write(self.pubkey.export_key("PEM"))
else:
file.write(self.pubkey.export_key("PEM"))

def read_code(self, code):
def read_code(self, code) -> None:
code, host = map(lambda x: x.strip("<>"), code.split("-"))
missing_padding = len(host) % 4
if missing_padding:
host += '=' * (4 - missing_padding)
self.code = code
self.host = base64.decodebytes(host.encode("ascii")).decode('ascii')

def import_response(self, response):
def import_response(self, response) -> None:
if type(response) is str:
with open(response, "r") as f:
response = json.load(f)
Expand All @@ -77,18 +76,33 @@ def import_response(self, response):
self.akey = response["akey"]
self.pkey = response["pkey"]

def export_response(self):
def export_response(self) -> None:
if self.host and ("host" not in self.info or not self.info["host"]):
self.info["host"] = self.host
with open("response.json", 'w') as f:
json.dump(self.info, f)

def activate(self):
def activate(self) -> None:
if self.code:
# set up URL parameters
# taken from https://github.com/FreshSupaSulley/DuOSU
params = {"customer_protocol": "1", "pubkey": self.pubkey.publickey().export_key("PEM").decode('ascii'), "pkpush": "rsa-sha512", "jailbroken": "false", "architecture": "arm64", "region": "US", "app_id": "com.duosecurity.duomobile", "full_disk_encryption": "true",
"passcode_status": "true", "platform": "Android", "app_version": "3.49.0", "app_build_number": "323001", "version": "11", "manufacturer": "unknown", "language": "en", "model": "Browser Extension", "security_patch_level": "2021-02-01"}
params = {
"customer_protocol": "1",
"pubkey": self.pubkey.publickey().export_key("PEM").decode('ascii'),
"pkpush": "rsa-sha512",
"jailbroken": "false",
"architecture": "arm64",
"region": "US",
"app_id": "com.duosecurity.duomobile",
"full_disk_encryption": True,
"passcode_status": True,
"app_version": "4.59.0",
"app_build_number": "459010",
"version": "13",
"manufacturer": "unknown",
"language": "en",
"security_patch_level": "2022-11-05"
}
# send activation request
r = requests.post(
f"https://{self.host}/push/v2/activation/{self.code}", params=params)
Expand All @@ -100,33 +114,45 @@ def activate(self):
raise ValueError("Code is null")

def generate_signature(self, method, path, time, data):
assert isinstance(self.host, str)
assert isinstance(self.pkey, str)
message = (time + "\n" + method + "\n" + self.host.lower() + "\n" +
path + '\n' + urllib.parse.urlencode(data)).encode('ascii')
print(message)
print(message.decode('utf-8'))

h = SHA512.new(message)
signature = pkcs1_15.new(self.pubkey).sign(h)
auth = ("Basic "+base64.b64encode((self.pkey + ":" +
auth = ("Basic " + base64.b64encode((self.pkey + ":" +
base64.b64encode(signature).decode('ascii')).encode('ascii')).decode('ascii'))
return auth

def get_transactions(self):
dt = datetime.datetime.utcnow()
def get_transactions(self) -> dict:
dt = datetime.datetime.now(datetime.UTC)
time = email.utils.format_datetime(dt)
path = "/push/v2/device/transactions"
data = {"akey": self.akey, "fips_status": "1",
"hsm_status": "true", "pkpush": "rsa-sha512"}
data = {
"akey": self.akey,
"fips_status": "1",
"hsm_status": "true",
"pkpush": "rsa-sha512",
}

signature = self.generate_signature("GET", path, time, data)
r = requests.get(f"https://{self.host}{path}", params=data, headers={
"Authorization": signature, "x-duo-date": time, "host": self.host})
r = requests.get(
f"https://{self.host}{path}",
params=data, headers={
"Authorization": signature,
"x-duo-date": time,
"host": self.host,
}
)

return r.json()

def reply_transaction(self, transactionid, answer):
dt = datetime.datetime.utcnow()
dt = datetime.datetime.now(datetime.UTC)
time = email.utils.format_datetime(dt)
path = "/push/v2/device/transactions/"+transactionid
path = "/push/v2/device/transactions/" + transactionid
data = {"akey": self.akey, "answer": answer, "fips_status": "1",
"hsm_status": "true", "pkpush": "rsa-sha512"}

Expand All @@ -135,40 +161,95 @@ def reply_transaction(self, transactionid, answer):
# data["push_received"] = True
# data["pull_to_refresh_used"] = True
signature = self.generate_signature("POST", path, time, data)
r = requests.post(f"https://{self.host}{path}", data=data, headers={
"Authorization": signature, "x-duo-date": time, "host": self.host, "txId": transactionid})
r = requests.post(
f"https://{self.host}{path}",
data=data, headers={
"Authorization": signature,
"x-duo-date": time,
"host": self.host,
"txId": transactionid,
}
)

return r.json()

def register(self, token):
dt = datetime.datetime.utcnow()
def register(self, token) -> dict:
dt = datetime.datetime.now(datetime.UTC)
time = email.utils.format_datetime(dt)
path = "/push/v2/device/registration"
data = {"akey": self.akey, "token": token}


# if answer == "approve":
# data["touch_id"] = False
# data["push_received"] = True
# data["pull_to_refresh_used"] = True
signature = self.generate_signature("POST", path, time, data)
r = requests.post(f"https://{self.host}{path}", data=data, headers={
"Authorization": signature, "x-duo-date": time, "host": self.host})
def device_info(self):
dt = datetime.datetime.utcnow()
r = requests.post(
f"https://{self.host}{path}",
data=data,
headers={
"Authorization": signature,
"x-duo-date": time,
"host": self.host,
}
)
return r.json()

def device_info(self) -> dict:
dt = datetime.datetime.now(datetime.UTC)
time = email.utils.format_datetime(dt)
path = "/push/v2/device/info"
data = {"akey": self.akey, "fips_status": "1",
"hsm_status": "true", "pkpush": "rsa-sha512"}
data = {
"akey": self.akey,
"fips_status": "1",
"hsm_status": "true",
"pkpush": "rsa-sha512"
}

signature = self.generate_signature("GET", path, time, data)
r = requests.get(f"https://{self.host}{path}", params=data, headers={
"Authorization": signature, "x-duo-date": time, "host": self.host})
r = requests.post(
f"https://{self.host}{path}",
params=data,
headers={
"Authorization": signature,
"x-duo-date": time,
"host": self.host,
}
)
return r.json()


def loop_each(c: Client) -> None:
try:
r = c.get_transactions()
except requests.exceptions.ConnectionError:
print("Connection Error")
time.sleep(5)
return

if r["stat"] == "FAIL":
print(r)
return

t = r["response"]["transactions"]
print("Checking for transactions")
if len(t):
for tx in t:
print(tx)
c.reply_transaction(tx["urgid"], 'approve')
time.sleep(2)
else:
print("No transactions")
time.sleep(10)


def loop(*a, **kwa):
while True:
loop_each(*a, **kwa)

# c = Client(response="response.json",keyfile="mykey.pem",code="")
# print(c)
# #print(c.get_transactions())
# print(c.get_transactions())
# print(c.reply_transaction("","approve"))


Expand Down Expand Up @@ -200,25 +281,9 @@ def main():
c.activate()
c.export_response()

while True:
try:
r = c.get_transactions()
except requests.exceptions.ConnectionError:
print("Connection Error")
time.sleep(5)
continue

t = r["response"]["transactions"]
print("Checking for transactions")
if len(t):
for tx in t:
print(tx)
c.reply_transaction(tx["urgid"], 'approve')
time.sleep(2)
else:
print("No transactions")

time.sleep(10)
l = threading.Thread(target=loop, args=(c,), daemon=True)
l.start()
input()


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions response.json

Large diffs are not rendered by default.