8
8
9
9
class Audio :
10
10
bvid = ""
11
- cid = ""
12
11
title = ""
13
12
playUrl = "http://api.bilibili.com/x/player/playurl"
14
- baseUrl = ""
13
+ part_list = []
15
14
16
15
def __init__ (self , bvid : str ) -> None :
17
16
if bvid is None :
@@ -25,17 +24,8 @@ def __init__(self, bvid: str) -> None:
25
24
self .__get_cid_title (bvid )
26
25
else :
27
26
# AV号
28
- self .__get_cid_title (bvid [:12 ], int ( bvid [ 13 :]) )
27
+ self .__get_cid_title (bvid [:12 ])
29
28
30
- params = {
31
- "fnval" : 16 ,
32
- "bvid" : self .bvid ,
33
- "cid" : self .cid ,
34
- }
35
-
36
- self .baseUrl = requests .get (self .playUrl , params = params ).json ()["data" ]["dash" ][
37
- "audio"
38
- ][0 ]["baseUrl" ]
39
29
40
30
def download (self ):
41
31
headers = {
@@ -51,35 +41,46 @@ def download(self):
51
41
52
42
start_time = time .time ()
53
43
try :
54
- response = requests .get (url = self .baseUrl , headers = headers , stream = True )
55
-
56
- total_size = int (response .headers .get ("content-length" , 0 ))
57
- temp_size = 0
58
-
59
- # 如果文件已存在,则跳过下载
60
- if os .path .exists (self .title + ".mp3" ):
61
- typer .echo (f"{ self .title } already exists, skip for now" )
62
- return
63
-
64
- with open (self .title + ".mp3" , "wb" ) as f :
65
- for chunk in response .iter_content (chunk_size = 1024 ):
66
- if chunk :
67
- f .write (chunk )
68
- f .flush ()
69
-
70
- temp_size += len (chunk )
71
- done = int (50 * temp_size / total_size )
72
- sys .stdout .write (
73
- "\r [%s%s] %s/%s %s"
74
- % (
75
- "#" * done ,
76
- "-" * (50 - done ),
77
- temp_size ,
78
- total_size ,
79
- self .title ,
44
+ for cid , part in zip (self .cid_list , self .part_list ):
45
+ baseUrl = requests .get (
46
+ self .playUrl ,
47
+ params = {
48
+ "fnval" : 16 ,
49
+ "bvid" : self .bvid ,
50
+ "cid" : cid ,
51
+ },
52
+ ).json ()["data" ]["dash" ]["audio" ][0 ]["baseUrl" ]
53
+
54
+ response = requests .get (url = baseUrl , headers = headers , stream = True )
55
+
56
+ total_size = int (response .headers .get ("content-length" , 0 ))
57
+ temp_size = 0
58
+
59
+ # 如果文件已存在,则跳过下载
60
+ file_path = f"{ self .title } -{ part } .mp3"
61
+ if os .path .exists (file_path ):
62
+ typer .echo (f"{ self .title } already exists, skip for now" )
63
+ return
64
+
65
+ with open (file_path , "wb" ) as f :
66
+ for chunk in response .iter_content (chunk_size = 1024 ):
67
+ if chunk :
68
+ f .write (chunk )
69
+ f .flush ()
70
+
71
+ temp_size += len (chunk )
72
+ done = int (50 * temp_size / total_size )
73
+ sys .stdout .write (
74
+ "\r [%s%s] %s/%s %s"
75
+ % (
76
+ "#" * done ,
77
+ "-" * (50 - done ),
78
+ temp_size ,
79
+ total_size ,
80
+ self .title ,
81
+ )
80
82
)
81
- )
82
- sys .stdout .flush ()
83
+ sys .stdout .flush ()
83
84
84
85
except Exception as e :
85
86
typer .echo ("Download failed" )
@@ -92,24 +93,29 @@ def download(self):
92
93
" " + str (round (end_time - start_time , 2 )) + " seconds download finish\n "
93
94
)
94
95
95
- def __get_cid_title (self , bvid : str , p : int = 1 ):
96
+ def __get_cid_title (self , bvid : str ):
96
97
url = "https://api.bilibili.com/x/web-interface/view?bvid={bvid}" .format (
97
98
bvid = bvid
98
99
)
99
100
try :
100
101
response = requests .get (url )
101
102
data = response .json ().get ("data" )
102
- self .title = data .get ("title" )
103
+ self .title = self . __title_process ( data .get ("title" ) )
103
104
104
105
# 这里是否也应该也使用get方法?
105
- self .cid = str (data ["pages" ][p - 1 ]["cid" ])
106
+ self .cid_list = [str (page ["cid" ]) for page in data ["pages" ]]
107
+ self .part_list = [self .__title_process (str (page ["part" ])) for page in data ["pages" ]]
106
108
107
109
except ValueError as e :
108
110
raise e
109
111
110
112
except Exception as e :
111
113
raise e
112
114
115
+
116
+ def __title_process (self , title : str ):
113
117
replaceList = ["?" , "\\ " , "*" , "|" , "<" , ">" , ":" , "/" , " " ]
114
118
for ch in replaceList :
115
- self .title = self .title .replace (ch , "-" )
119
+ title = title .replace (ch , "-" )
120
+
121
+ return title
0 commit comments