From da1e6d13361e149ff00f68f1ba0bf80e446d08c0 Mon Sep 17 00:00:00 2001 From: JoeanAmier Date: Thu, 9 Jan 2025 21:37:51 +0800 Subject: [PATCH] =?UTF-8?q?feat(service/douyin/logic/common.py):=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20a=5Fbogus=20=E7=AD=BE=E5=90=8D=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ABogus 类,用于生成 a_bogus 签名 - 添加相关导入和常量定义 - 实现生成签名所需的各种方法和逻辑 - 优化代码格式和结构 --- service/douyin/logic/common.py | 572 ++++++++++++++++++++++++++++++++- 1 file changed, 564 insertions(+), 8 deletions(-) diff --git a/service/douyin/logic/common.py b/service/douyin/logic/common.py index 21f5587..643b013 100644 --- a/service/douyin/logic/common.py +++ b/service/douyin/logic/common.py @@ -1,3 +1,11 @@ +from gmssl import sm3, func +from urllib.parse import urlencode +from urllib.parse import quote +from time import time +from re import compile +from random import random +from random import randint +from random import choice from lib.logger import logger import execjs from lib import requests @@ -12,12 +20,12 @@ 'aid': '6383', 'channel': 'channel_pc_web', 'update_version_code': '170400', - 'pc_client_type': '1', # Windows + 'pc_client_type': '1', # Windows 'version_code': '190500', 'version_name': '19.5.0', 'cookie_enabled': 'true', - 'screen_width': '2560', # from cookie dy_swidth - 'screen_height': '1440', # from cookie dy_sheight + 'screen_width': '2560', # from cookie dy_swidth + 'screen_height': '1440', # from cookie dy_sheight 'browser_language': 'zh-CN', 'browser_platform': 'Win32', 'browser_name': 'Chrome', @@ -40,8 +48,10 @@ # 'a_bogus': 'xJRwQfLfDkdsgDyh54OLfY3q66M3YQnV0trEMD2f5V3WF639HMPh9exLx-TvU6DjNs%2FDIeEjy4haT3nprQVH8qw39W4x%2F2CgQ6h0t-P2so0j53iJCLgmE0hE4vj3SlF85XNAiOk0y7ICKY00AInymhK4bfebY7Y6i6tryE%3D%3D' # sign } -COMMON_HEADERS ={ - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", +USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" + +COMMON_HEADERS = { + "User-Agent": USERAGENT, "sec-fetch-site": "same-origin", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", @@ -59,6 +69,7 @@ DOUYIN_SIGN = execjs.compile(open('lib/js/douyin.js', encoding='utf-8').read()) + async def get_webid(headers: dict): url = 'https://www.douyin.com/?recommend=1' logger.info( @@ -76,16 +87,18 @@ async def get_webid(headers: dict): return match.group(1) return None + def cookies_to_dict(cookie_string) -> dict: cookies = cookie_string.split('; ') cookie_dict = {} for cookie in cookies: if cookie == '' or cookie == 'douyin.com': continue - key, value = cookie.split('=', 1)[0], cookie.split('=', 1)[1] + key, value = cookie.split('=', 1)[0], cookie.split('=', 1)[1] cookie_dict[key] = value return cookie_dict + async def deal_params(params: dict, headers: dict) -> dict: cookie = headers.get('cookie') or headers.get('Cookie') if not cookie: @@ -101,6 +114,7 @@ async def deal_params(params: dict, headers: dict) -> dict: params['webid'] = await get_webid(headers) return params + def get_ms_token(randomlength=120): """ 根据传入长度产生随机字符串 @@ -112,6 +126,7 @@ def get_ms_token(randomlength=120): random_str += base_str[random.randint(0, length)] return random_str + async def common_request(uri: str, params: dict, headers: dict) -> tuple[dict, bool]: """ 请求 douyin @@ -125,7 +140,8 @@ async def common_request(uri: str, params: dict, headers: dict) -> tuple[dict, b headers.update(COMMON_HEADERS) params = await deal_params(params, headers) - query = '&'.join([f'{k}={urllib.parse.quote(str(v))}' for k, v in params.items()]) + query = '&'.join( + [f'{k}={urllib.parse.quote(str(v))}' for k, v in params.items()]) call_name = 'sign_datail' if 'reply' in uri: call_name = 'sign_reply' @@ -147,4 +163,544 @@ async def common_request(uri: str, params: dict, headers: dict) -> tuple[dict, b f'url: {url}, params: {params}, request error, code: {response.status_code}, body: {response.text}') return response.json(), False - return response.json(), True \ No newline at end of file + return response.json(), True + + +class ABogus: + __filter = compile(r'%([0-9A-F]{2})') + __arguments = [0, 1, 14] + __ua_key = "\u0000\u0001\u000e" + __end_string = "cus" + __version = [1, 0, 1, 5] + __browser = "1536|742|1536|864|0|0|0|0|1536|864|1536|864|1536|742|24|24|Win32" + __reg = [ + 1937774191, + 1226093241, + 388252375, + 3666478592, + 2842636476, + 372324522, + 3817729613, + 2969243214, + ] + __str = { + "s0": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=", + "s1": "Dkdpgh4ZKsQB80/Mfvw36XI1R25+WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=", + "s2": "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=", + "s3": "ckdp1h4ZKsUB80/Mfvw36XIgR25+WQAlEi7NLboqYTOPuzmFjJnryx9HVGDaStCe", + "s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe", + } + + def __init__(self, + user_agent: str = USERAGENT, + platform: str = None, ): + self.chunk = [] + self.size = 0 + self.reg = self.__reg[:] + self.ua_code = self.generate_ua_code(user_agent) + self.browser = self.generate_browser_info( + platform) if platform else self.__browser + self.browser_len = len(self.browser) + self.browser_code = self.char_code_at(self.browser) + + @classmethod + def list_1(cls, random_num=None, a=170, b=85, c=45, ) -> list: + return cls.random_list( + random_num, + a, + b, + 1, + 2, + 5, + c & a, + ) + + @classmethod + def list_2(cls, random_num=None, a=170, b=85, ) -> list: + return cls.random_list( + random_num, + a, + b, + 1, + 0, + 0, + 0, + ) + + @classmethod + def list_3(cls, random_num=None, a=170, b=85, ) -> list: + return cls.random_list( + random_num, + a, + b, + 1, + 0, + 5, + 0, + ) + + @staticmethod + def random_list( + a: float = None, + b=170, + c=85, + d=0, + e=0, + f=0, + g=0, + ) -> list: + r = a or (random() * 10000) + v = [ + r, + int(r) & 255, + int(r) >> 8, + ] + s = v[1] & b | d + v.append(s) + s = v[1] & c | e + v.append(s) + s = v[2] & b | f + v.append(s) + s = v[2] & c | g + v.append(s) + return v[-4:] + + @staticmethod + def from_char_code(*args): + return "".join(chr(code) for code in args) + + @classmethod + def generate_string_1( + cls, + random_num_1=None, + random_num_2=None, + random_num_3=None, + ): + return cls.from_char_code(*cls.list_1(random_num_1)) + cls.from_char_code( + *cls.list_2(random_num_2)) + cls.from_char_code(*cls.list_3(random_num_3)) + + def generate_string_2( + self, + url_params: str, + method="GET", + start_time=0, + end_time=0, + ) -> str: + a = self.generate_string_2_list( + url_params, + method, + start_time, + end_time, + ) + e = self.end_check_num(a) + a.extend(self.browser_code) + a.append(e) + return self.rc4_encrypt(self.from_char_code(*a), "y") + + def generate_ua_code(self, user_agent: str) -> list[int]: + u = self.rc4_encrypt(user_agent, self.__ua_key) + u = self.generate_result(u, "s3") + return self.sum(u) + + def generate_string_2_list( + self, + url_params: str, + method="GET", + start_time=0, + end_time=0, + ) -> list: + start_time = start_time or int(time() * 1000) + end_time = end_time or (start_time + randint(4, 8)) + params_array = self.generate_params_code(url_params) + method_array = self.generate_method_code(method) + return self.list_4( + (end_time >> 24) & 255, + params_array[21], + self.ua_code[23], + (end_time >> 16) & 255, + params_array[22], + self.ua_code[24], + (end_time >> 8) & 255, + (end_time >> 0) & 255, + (start_time >> 24) & 255, + (start_time >> 16) & 255, + (start_time >> 8) & 255, + (start_time >> 0) & 255, + method_array[21], + method_array[22], + int(end_time / 256 / 256 / 256 / 256) >> 0, + int(start_time / 256 / 256 / 256 / 256) >> 0, + self.browser_len, + ) + + @staticmethod + def reg_to_array(a): + o = [0] * 32 + for i in range(8): + c = a[i] + o[4 * i + 3] = (255 & c) + c >>= 8 + o[4 * i + 2] = (255 & c) + c >>= 8 + o[4 * i + 1] = (255 & c) + c >>= 8 + o[4 * i] = (255 & c) + + return o + + def compress(self, a): + f = self.generate_f(a) + i = self.reg[:] + for o in range(64): + c = self.de(i[0], 12) + i[4] + self.de(self.pe(o), o) + c = (c & 0xFFFFFFFF) + c = self.de(c, 7) + s = (c ^ self.de(i[0], 12)) & 0xFFFFFFFF + + u = self.he(o, i[0], i[1], i[2]) + u = (u + i[3] + s + f[o + 68]) & 0xFFFFFFFF + + b = self.ve(o, i[4], i[5], i[6]) + b = (b + i[7] + c + f[o]) & 0xFFFFFFFF + + i[3] = i[2] + i[2] = self.de(i[1], 9) + i[1] = i[0] + i[0] = u + + i[7] = i[6] + i[6] = self.de(i[5], 19) + i[5] = i[4] + i[4] = (b ^ self.de(b, 9) ^ self.de(b, 17)) & 0xFFFFFFFF + + for l in range(8): + self.reg[l] = (self.reg[l] ^ i[l]) & 0xFFFFFFFF + + @classmethod + def generate_f(cls, e): + r = [0] * 132 + + for t in range(16): + r[t] = (e[4 * t] << 24) | (e[4 * t + 1] << + 16) | (e[4 * t + 2] << 8) | e[4 * t + 3] + r[t] &= 0xFFFFFFFF + + for n in range(16, 68): + a = r[n - 16] ^ r[n - 9] ^ cls.de(r[n - 3], 15) + a = a ^ cls.de(a, 15) ^ cls.de(a, 23) + r[n] = (a ^ cls.de(r[n - 13], 7) ^ r[n - 6]) & 0xFFFFFFFF + + for n in range(68, 132): + r[n] = (r[n - 68] ^ r[n - 64]) & 0xFFFFFFFF + + return r + + @staticmethod + def pad_array(arr, length=60): + while len(arr) < length: + arr.append(0) + return arr + + def fill(self, length=60): + size = 8 * self.size + self.chunk.append(128) + self.chunk = self.pad_array(self.chunk, length) + for i in range(4): + self.chunk.append((size >> 8 * (3 - i)) & 255) + + @staticmethod + def list_4( + a: int, + b: int, + c: int, + d: int, + e: int, + f: int, + g: int, + h: int, + i: int, + j: int, + k: int, + m: int, + n: int, + o: int, + p: int, + q: int, + r: int, + ) -> list: + return [ + 44, + a, + 0, + 0, + 0, + 0, + 24, + b, + n, + 0, + c, + d, + 0, + 0, + 0, + 1, + 0, + 239, + e, + o, + f, + g, + 0, + 0, + 0, + 0, + h, + 0, + 0, + 14, + i, + j, + 0, + k, + m, + 3, + p, + 1, + q, + 1, + r, + 0, + 0, + 0] + + @staticmethod + def end_check_num(a: list): + r = 0 + for i in a: + r ^= i + return r + + @classmethod + def decode_string(cls, url_string, ): + decoded = cls.__filter.sub(cls.replace_func, url_string) + return decoded + + @staticmethod + def replace_func(match): + return chr(int(match.group(1), 16)) + + @staticmethod + def de(e, r): + r %= 32 + return ((e << r) & 0xFFFFFFFF) | (e >> (32 - r)) + + @staticmethod + def pe(e): + return 2043430169 if 0 <= e < 16 else 2055708042 + + @staticmethod + def he(e, r, t, n): + if 0 <= e < 16: + return (r ^ t ^ n) & 0xFFFFFFFF + elif 16 <= e < 64: + return (r & t | r & n | t & n) & 0xFFFFFFFF + raise ValueError + + @staticmethod + def ve(e, r, t, n): + if 0 <= e < 16: + return (r ^ t ^ n) & 0xFFFFFFFF + elif 16 <= e < 64: + return (r & t | ~r & n) & 0xFFFFFFFF + raise ValueError + + @staticmethod + def convert_to_char_code(a): + d = [] + for i in a: + d.append(ord(i)) + return d + + @staticmethod + def split_array(arr, chunk_size=64): + result = [] + for i in range(0, len(arr), chunk_size): + result.append(arr[i:i + chunk_size]) + return result + + @staticmethod + def char_code_at(s): + return [ord(char) for char in s] + + def write(self, e, ): + self.size = len(e) + if isinstance(e, str): + e = self.decode_string(e) + e = self.char_code_at(e) + if len(e) <= 64: + self.chunk = e + else: + chunks = self.split_array(e, 64) + for i in chunks[:-1]: + self.compress(i) + self.chunk = chunks[-1] + + def reset(self, ): + self.chunk = [] + self.size = 0 + self.reg = self.__reg[:] + + def sum(self, e, length=60): + self.reset() + self.write(e) + self.fill(length) + self.compress(self.chunk) + return self.reg_to_array(self.reg) + + @classmethod + def generate_result_unit(cls, n, s): + r = "" + for i, j in zip(range(18, -1, -6), (16515072, 258048, 4032, 63)): + r += cls.__str[s][(n & j) >> i] + return r + + @classmethod + def generate_result_end(cls, s, e="s4"): + r = "" + b = ord(s[120]) << 16 + r += cls.__str[e][(b & 16515072) >> 18] + r += cls.__str[e][(b & 258048) >> 12] + r += "==" + return r + + @classmethod + def generate_result(cls, s, e="s4"): + r = [] + + for i in range(0, len(s), 3): + if i + 2 < len(s): + n = ( + (ord(s[i]) << 16) + | (ord(s[i + 1]) << 8) + | ord(s[i + 2]) + ) + elif i + 1 < len(s): + n = (ord(s[i]) << 16) | ( + ord(s[i + 1]) << 8 + ) + else: + n = ord(s[i]) << 16 + + for j, k in zip(range(18, -1, -6), + (0xFC0000, 0x03F000, 0x0FC0, 0x3F)): + if j == 6 and i + 1 >= len(s): + break + if j == 0 and i + 2 >= len(s): + break + r.append(cls.__str[e][(n & k) >> j]) + + r.append("=" * ((4 - len(r) % 4) % 4)) + return "".join(r) + + @classmethod + def generate_args_code(cls): + a = [] + for j in range(24, -1, -8): + a.append(cls.__arguments[0] >> j) + a.append(cls.__arguments[1] / 256) + a.append(cls.__arguments[1] % 256) + a.append(cls.__arguments[1] >> 24) + a.append(cls.__arguments[1] >> 16) + for j in range(24, -1, -8): + a.append(cls.__arguments[2] >> j) + return [int(i) & 255 for i in a] + + def generate_method_code(self, method: str = "GET") -> list[int]: + return self.sm3_to_array(self.sm3_to_array(method + self.__end_string)) + + def generate_params_code(self, params: str) -> list[int]: + return self.sm3_to_array(self.sm3_to_array(params + self.__end_string)) + + @classmethod + def sm3_to_array(cls, data: str | list) -> list[int]: + if isinstance(data, str): + b = data.encode("utf-8") + else: + b = bytes(data) # 将 List[int] 转换为字节数组 + + # 将字节数组转换为适合 sm3.sm3_hash 函数处理的列表格式 + h = sm3.sm3_hash(func.bytes_to_list(b)) + + # 将十六进制字符串结果转换为十进制整数列表 + return [int(h[i: i + 2], 16) for i in range(0, len(h), 2)] + + @classmethod + def generate_browser_info(cls, platform: str = "Win32") -> str: + inner_width = randint(1280, 1920) + inner_height = randint(720, 1080) + outer_width = randint(inner_width, 1920) + outer_height = randint(inner_height, 1080) + screen_x = 0 + screen_y = choice((0, 30)) + value_list = [ + inner_width, + inner_height, + outer_width, + outer_height, + screen_x, + screen_y, + 0, + 0, + outer_width, + outer_height, + outer_width, + outer_height, + inner_width, + inner_height, + 24, + 24, + platform, + ] + return "|".join(str(i) for i in value_list) + + @staticmethod + def rc4_encrypt(plaintext, key): + s = list(range(256)) + j = 0 + + for i in range(256): + j = (j + s[i] + ord(key[i % len(key)])) % 256 + s[i], s[j] = s[j], s[i] + + i = 0 + j = 0 + cipher = [] + + for k in range(len(plaintext)): + i = (i + 1) % 256 + j = (j + s[i]) % 256 + s[i], s[j] = s[j], s[i] + t = (s[i] + s[j]) % 256 + cipher.append(chr(s[t] ^ ord(plaintext[k]))) + + return ''.join(cipher) + + def get_value(self, + url_params: dict | str, + method="GET", + start_time=0, + end_time=0, + random_num_1=None, + random_num_2=None, + random_num_3=None, + ) -> str: + string_1 = self.generate_string_1( + random_num_1, + random_num_2, + random_num_3, + ) + string_2 = self.generate_string_2(urlencode(url_params, quote_via=quote, ) if isinstance( + url_params, dict) else url_params, method, start_time, end_time, ) + string = string_1 + string_2 + return self.generate_result(string, "s4")