-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathproxyspider.py
184 lines (156 loc) · 6.61 KB
/
proxyspider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- author by pekingzcc -*-
# -*- date : 2017-05-19 -*-
import requests
import Queue
import re
from lxml import etree
from lxml import html
from random import choice
import threading
import time
import qiniuupload
from config import (
PROXY_SITES_BY_REGX, PROXY_SITES_BY_XPATH, OUTPUT_FILE, USER_AGENT_LIST, RETRY_NUM, TIME_OUT, TEST_URL, CHECK_PROXY_XPATH
)
class Proxy(object):
def __init__(self, ip_port, anonymity_level, proxy_type='HTTP'):
self.ip_port = ip_port
self.proxy_type = proxy_type
self.anonymity_level = anonymity_level
def __eq__(self, other):
if isinstance(other, Proxy):
return ((self.ip_port == other.ip_port) and (self.proxy_type == other.proxy_type) and (self.anonymity_level == other.anonymity_level))
else:
return False
def __hash__(self):
return hash(self.ip_port + " " + self.proxy_type + " " + self.anonymity_level)
class ProxySpider(object):
"""代理IP 爬虫"""
def __init__(self):
self.fetch_finish = False
self.proxy_queue = Queue.Queue()
self.lock = threading.Lock()
self.good_proxy = set()
"""
起一个线程将采集到的所有代理IP写入一个queue中
"""
def in_proxy_queue(self):
'''根据正则直接获取代理IP 部分'''
for site in PROXY_SITES_BY_REGX['urls']:
resp = self._fetch(site)
if resp is not None and resp.status_code == 200:
try:
proxy_list = self._extract_by_regx(resp)
for proxy in proxy_list:
print "Get proxy %s and get into queue" % (proxy)
self.proxy_queue.put(proxy)
except Exception as e:
continue
'''根据xpath 获取代理IP 部分'''
for sites in PROXY_SITES_BY_XPATH:
for site in sites['urls']:
resp = self._fetch(site)
if resp is not None and resp.status_code == 200:
try:
proxy_list = self._extract_by_xpath(resp, sites['ip_xpath'], sites['port_xpath'])
for proxy in proxy_list:
print "Get proxy %s and get into queue" % (proxy)
self.proxy_queue.put(proxy)
except Exception as e:
continue
print "Get all proxy in queue!"
self.fetch_finish = True
"""
起多个线程取出queue中的代理IP 测试是否可用
"""
def out_proxy_queue(self):
while not self.proxy_queue.empty() or self.fetch_finish == False:
print "Begin to get proxy from queue"
proxy = self.proxy_queue.get()
check_proxy = self._fetch(TEST_URL, proxy)
if check_proxy is not None and check_proxy.status_code == 200:
resp_str = html.fromstring(check_proxy.text)
http_via = resp_str.xpath(CHECK_PROXY_XPATH['HTTP_VIA'])
http_x_forward_for = resp_str.xpath(CHECK_PROXY_XPATH['HTTP_X_FORWARDED_FOR'])
if http_via[0] == "anonymous / none" and http_x_forward_for[0] == "anonymous / none":
kwargs = {"ip_port": proxy, "anonymity_level": "Elite"}
elif http_via[0] and http_x_forward_for[0]:
kwargs = {"ip_port": proxy, "anonymity_level": "Anonymous"}
else:
kwargs = {"ip_port": proxy, "anonymity_level": "Transparent"}
proxy_instance = Proxy(**kwargs)
self._deduplicate_proxy(proxy_instance)
""" 抓取代理网站函数"""
def _fetch(self, url, proxy=None):
kwargs = {
"headers": {
"User-Agent": choice(USER_AGENT_LIST),
},
"timeout": TIME_OUT,
"verify": False,
}
resp = None
for i in range(RETRY_NUM):
try:
if proxy is not None:
kwargs["proxies"] = {
"http": proxy}
resp = requests.get(url, **kwargs)
break
except Exception as e:
print "fetch %s failed!\n%s , retry %d" % (url, str(e), i)
time.sleep(1)
continue
return resp
""" 根据解析抓取到的内容,得到代理IP"""
def _extract_by_regx(self, resp):
proxy_list = []
if resp is not None:
proxy_list = re.findall(PROXY_SITES_BY_REGX['proxy_regx'], resp.text)
return proxy_list
def _extract_by_xpath(self, resp, ip_xpath, port_xpath):
#import pdb;pdb.set_trace()
proxy_list = []
if resp is not None:
resp = html.fromstring(resp.text)
ip_list = resp.xpath(ip_xpath)
port_list = resp.xpath(port_xpath)
for i in range(len(ip_list)):
proxy = ip_list[i] + ":" + port_list[i]
proxy_list.append(proxy)
return proxy_list
""" 输出可用的代理IP 到 set 中以达到去重"""
def _deduplicate_proxy(self, proxy):
if not proxy:
return
with self.lock:
self.good_proxy.add(proxy)
""" 持久化可用代理IP """
def output_proxy(self):
with open(OUTPUT_FILE, "w+") as proxy_file:
proxy_file.write("proxy type anonymity_level\n" )
for proxy in self.good_proxy:
print "Write %sto proxy_list.txt" % proxy
proxy_file.write("%s %s %s\n" % (proxy.ip_port, proxy.proxy_type, proxy.anonymity_level))
"""一个线程用于抓取,多个线程用于测试"""
def run(self):
threads = []
in_proxy_queue_thread = threading.Thread(target = self.in_proxy_queue)
out_proxy_queue_threads = [threading.Thread(target = self.out_proxy_queue) for i in range(50)]
threads.append(in_proxy_queue_thread)
threads.extend(out_proxy_queue_threads)
[thread.start() for thread in threads]
[thread.join() for thread in threads]
"""最终输出可用代理IP"""
self.output_proxy()
def main():
spider = ProxySpider()
spider.run()
print "fetch is over, begin to upload!!!!!!"
"""上述任务执行完成后,上传结果到七牛,注意在上传之前先要配置七牛的认证"""
#uploadtoqiniu = qiniuupload.uploadToqiniu()
#uploadtoqiniu.upload()
if __name__ == "__main__":
main()