8
8
from aiohttp_socks import ProxyType , ProxyConnector , ChainProxyConnector
9
9
10
10
11
+ RE_PROXY = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,6}'
12
+
13
+
11
14
class ProxyObject :
12
15
def __init__ (self , ip , port ):
13
16
self .ip = ip
@@ -26,7 +29,7 @@ async def check_url(self, url):
26
29
try :
27
30
connector = ProxyConnector .from_url (self .get_str ())
28
31
async with aiohttp .ClientSession (connector = connector ) as _sess :
29
- response = await _sess .get (url , timeout = 2 )
32
+ response = await _sess .get (url , timeout = 1 )
30
33
if response .status == 200 :
31
34
return True
32
35
except Exception as e :
@@ -37,9 +40,19 @@ async def check_url(self, url):
37
40
38
41
39
42
class ProxyManager :
40
- def __init__ (self ):
41
- self .biffer_proxys = []
42
- self .proxys = []
43
+ def __init__ (self , test_url ):
44
+ self .test_url = test_url
45
+ self .queue_proxys = asyncio .Queue ()
46
+
47
+ loop = asyncio .get_event_loop ()
48
+ loop .create_task (self .load_loop ())
49
+
50
+ async def load_loop (self ):
51
+ while True :
52
+ if self .queue_proxys .empty ():
53
+ await self .load_proxys ()
54
+
55
+ await asyncio .sleep (10 )
43
56
44
57
async def load_proxys (self ):
45
58
print ("run load proxy" )
@@ -60,25 +73,32 @@ async def load_proxys(self):
60
73
raise Exception (f'Failed request to "{ url } " url' )
61
74
62
75
text = await response .text ()
63
- x = re .findall (r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,6}' , text )
64
76
65
- pxs = [x .split (':' ) for x in text .split ('\n ' )]
66
- self .biffer_proxys = [ProxyObject (x [0 ], x [1 ]) for x in pxs if len (x ) == 2 ]
67
- print (f'load { len (self .proxys )} proxys' )
77
+ proxys = []
68
78
69
- async def get_proxy (self , test_url ):
70
- print ('getting proxy' )
71
- count_try = 0
72
- while count_try < 10 :
73
- if len (self .proxys ) == 0 :
74
- await self .load_proxys ()
79
+ for line in re .findall (RE_PROXY , text ):
80
+ sp = line .split (':' )
81
+ if len (sp ) != 2 :
82
+ continue
83
+ proxy = ProxyObject (sp [0 ], sp [1 ])
84
+ proxys .append (proxy )
85
+
86
+ print (f'find { len (proxys )} proxys' )
87
+
88
+ while len (proxys ) > 0 :
89
+ batch_proxys = proxys [:20 ]
90
+ proxys = proxys [20 :]
75
91
76
- proxy = self .proxys .pop ()
92
+ checked = await asyncio .gather (* [
93
+ proxy .check_url (self .test_url )
94
+ for proxy in batch_proxys
95
+ ])
77
96
78
- if await proxy .check_url (test_url ):
79
- print (f"Proxy { proxy } find success" )
80
- return proxy
97
+ print (f'checked { len (checked )} proxys' )
81
98
82
- await asyncio .sleep (1 )
99
+ for i , proxy in enumerate (batch_proxys ):
100
+ if checked [i ]:
101
+ self .queue_proxys .put_nowait (proxy )
83
102
84
- raise Exception ("Max count trys getting proxy =(" )
103
+ async def get_proxy (self ):
104
+ return await self .queue_proxys .get ()
0 commit comments