-
Notifications
You must be signed in to change notification settings - Fork 19
/
user_tweets.py
155 lines (127 loc) · 6.68 KB
/
user_tweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# coding:utf-8
import datetime as dt
import re
import time
from urllib import parse
import crawlertool as tool
from selenium import webdriver
class SpiderTwitterAccountPost(tool.abc.SingleSpider):
"""
Twitter爬虫
"""
def __init__(self, driver):
self.driver = driver
self.user_name = None
@staticmethod
def get_twitter_user_name(page_url: str) -> str:
"""
从URL提取用户名称
:param page_url: Twitter任意账号页的Url
:return: Twitter账号用户名称
"""
if pattern := re.search(r"(?<=twitter.com/)[^/]+", page_url):
return pattern.group()
def running(self, user_name: str, since_date, until_date):
"""执行Twitter账号推文爬虫
:param user_name:
:param since_date: 抓取时间范围的右侧边界(最早日期)
:param until_date: 抓取时间范围的左侧边界(最晚日期)
:return: 推文信息-list
"""
self.user_name = user_name
item_list = []
query_sentence = []
query_sentence.append("from:%s" % user_name) # 搜索目标用户发布的推文
query_sentence.append("-filter:retweets") # 过滤到所有转推的推文
if since_date is not None:
query_sentence.append("since:%s" % str(since_date)) # 设置开始时间
query_sentence.append("until:%s" % str(until_date)) # 设置结束时间
query = " ".join(query_sentence) # 计算q(query)参数的值
params = {
"q": query,
"f": "live"
}
actual_url = "https://twitter.com/search?" + parse.urlencode(params)
self.console("实际请求Url:" + actual_url)
# 打开目标Url
self.driver.get(actual_url)
time.sleep(3)
# 判断是否该账号在指定时间范围内没有发文
label_test = self.driver.find_element_by_css_selector("main > div > div > div > div:nth-child(1) > div > div:nth-child(2) > div > div")
if "你输入的词没有找到任何结果" in label_test.text:
return item_list
# 定位标题外层标签
label_outer = self.driver.find_element_by_css_selector(
"main > div > div > div > div:nth-child(1) > div > div:nth-child(2) > div > div > section > div > div")
self.driver.execute_script("arguments[0].id = 'outer';", label_outer) # 设置标题外层标签的ID
# 循环遍历外层标签
tweet_id_set = set()
for _ in range(1000):
last_label_tweet = None
for label_tweet in label_outer.find_elements_by_xpath('//*[@id="outer"]/div'): # 定位到推文标签
item = {}
# 读取推文ID
try:
if label := label_tweet.find_element_by_css_selector(
"article > div > div > div > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div > div > div:nth-child(1) > a"):
if pattern := re.search("[0-9]+$", label.get_attribute("href")):
item["tweet_id"] = pattern.group()
except:
pass
if "tweet_id" not in item:
self.log("账号名称:" + user_name + "|未找到推文ID标签(第" + str(len(item_list)) + "条推文)")
continue
# 判断推文是否已被抓取(若未被抓取则解析推文)
if item["tweet_id"] in tweet_id_set:
continue
tweet_id_set.add(item["tweet_id"])
last_label_tweet = label_tweet
# 解析推文发布时间
if label := label_tweet.find_element_by_css_selector(
"article > div > div > div > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div > div > div:nth-child(1) > a > time"):
item["time"] = label.get_attribute("datetime").replace("T", " ").replace(".000Z", "")
# 解析推文内容
if label := label_tweet.find_element_by_css_selector(
"article > div > div > div > div:nth-child(2) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1)"):
item["text"] = label.text
item["replies"] = 0 # 推文回复数
item["retweets"] = 0 # 推文转推数
item["likes"] = 0 # 推文喜欢数
# 定位到推文反馈数据标签
if label := label_tweet.find_element_by_css_selector(
"article > div > div > div > div:nth-child(2) > div:nth-child(2) > div:nth-child(2) > div[role='group']"):
if text := label.get_attribute("aria-label"):
# 解析推文反馈数据
for feedback_item in text.split(","):
if "回复" in feedback_item:
if pattern := re.search("[0-9]+", feedback_item):
item["replies"] = int(pattern.group())
if "转推" in feedback_item:
if pattern := re.search("[0-9]+", feedback_item):
item["retweets"] = int(pattern.group())
if "喜欢" in feedback_item:
if pattern := re.search("[0-9]+", feedback_item):
item["likes"] = int(pattern.group())
item_list.append(item)
# 向下滚动到最下面的一条推文
if last_label_tweet is not None:
self.driver.execute_script("arguments[0].scrollIntoView();", last_label_tweet) # 滑动到推文标签
self.console("执行一次向下翻页...")
time.sleep(3)
else:
break
return item_list
if __name__ == "__main__":
"""获取浏览器缓存,以便进行登录"""
#初始化webdriver
driverOptions = webdriver.ChromeOptions()
#导入浏览器缓存,实现登录状态
driverOptions.add_argument(r"user-data-dir=C:\Users\loeoe\AppData\Local\Google\Chrome\User Data")
#driverOptions.add_argument("blink-settings=imagesEnabled=false") 不加载图片,加快爬取速度
#webdriver路径
driver = webdriver.Chrome(executable_path=r"E:\\Temp\\drivers\\chromedriver\\win32\\89.0.4389.23\\chromedriver.exe",port=0,chrome_options=driverOptions)
print(SpiderTwitterAccountPost(driver).running(
user_name=SpiderTwitterAccountPost.get_twitter_user_name("https://twitter.com/nasa"),
since_date=dt.date(2021, 3, 18),
until_date=dt.date(2021, 3, 20)
))