-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
371 lines (305 loc) · 15.4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
import argparse
import time
import json
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.firefox import GeckoDriverManager
from bs4 import BeautifulSoup
from tqdm import tqdm
from jinja2 import Environment, FileSystemLoader
import re
import platform
import os
class YouTubeCommentScraper:
def __init__(self):
self.setup_driver()
self.max_videos = None # 移除视频数量限制
def setup_driver(self):
"""设置Firefox浏览器驱动"""
firefox_options = Options()
firefox_options.add_argument('--headless') # 无头模式
firefox_options.set_preference("intl.accept_languages", "zh-CN, zh")
try:
# 使用 GeckoDriverManager 自动下载和管理 Firefox 驱动
driver_path = GeckoDriverManager().install()
print(f"Firefox Driver 路径: {driver_path}")
service = Service(driver_path)
self.driver = webdriver.Firefox(service=service, options=firefox_options)
except Exception as e:
print(f"设置 Firefox 驱动时出错: {str(e)}")
print("请确保已安装 Firefox 浏览器")
raise
self.wait = WebDriverWait(self.driver, 10)
def get_video_list(self):
"""获取频道视频列表"""
videos = []
seen_urls = set() # 用于存储已见过的URL
try:
# 等待视频列表容器加载
print("等待页面加载...")
container_selector = "ytd-rich-grid-renderer"
try:
WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, container_selector))
)
except TimeoutException:
print("无法找到视频列表容器,可能是页面结构已更改或加载失败")
return videos
time.sleep(3) # 额外等待以确保页面完全加载
print("开始加载视频列表...")
# 滚动加载更多视频,直到没有新内容
last_video_count = 0
no_new_videos_count = 0
max_attempts = 50 # 增加最大尝试次数
with tqdm(desc="加载视频列表") as pbar:
while no_new_videos_count < 3 and max_attempts > 0: # 连续3次没有新视频才停止
# 滚动到底部
self.driver.execute_script("""
window.scrollTo({
top: document.documentElement.scrollHeight,
behavior: 'smooth'
});
""")
time.sleep(2)
try:
# 获取当前可见的视频元素数量
video_elements = self.driver.find_elements(By.CSS_SELECTOR, "ytd-rich-item-renderer")
current_video_count = len(video_elements)
if current_video_count > last_video_count:
# 更新进度条
pbar.update(current_video_count - last_video_count)
last_video_count = current_video_count
no_new_videos_count = 0
else:
no_new_videos_count += 1
max_attempts -= 1
except Exception as e:
print(f"滚动加载时出错: {str(e)}")
break
print(f"\n已加载 {last_video_count} 个视频元素")
# 获取所有视频信息
video_elements = self.driver.find_elements(By.CSS_SELECTOR, "ytd-rich-item-renderer")
for video in tqdm(video_elements, desc="处理视频信息"):
try:
# 获取视频链接和缩略图
video_data = self.driver.execute_script("""
function getVideoData(element) {
const thumbnail = element.querySelector('#thumbnail img');
const link = element.querySelector('a#video-title-link');
const title = link ? link.getAttribute('title') : '';
if (!thumbnail || !link) return null;
const thumbnailUrl = thumbnail.src ||
thumbnail.dataset.src ||
(thumbnail.srcset ? thumbnail.srcset.split(' ')[0] : null);
return {
url: link.href,
title: title,
thumbnail: thumbnailUrl
};
}
return getVideoData(arguments[0]);
""", video)
if video_data and video_data['url'] and video_data['thumbnail']:
# 检查URL是否已经存在
video_url = video_data['url']
if video_url not in seen_urls:
seen_urls.add(video_url)
videos.append(video_data)
except Exception as e:
print(f"处理视频元素时出错: {str(e)}")
continue
print(f"\n找到 {len(videos)} 个唯一视频")
return videos
except Exception as e:
print(f"获取视频列表时出错: {str(e)}")
return videos
def process_video(self, video_url):
"""处理单个视频"""
try:
self.driver.get(video_url)
time.sleep(5) # 等待页面加载
# 获取视频标题
title = self.driver.find_element(By.CSS_SELECTOR, "h1.ytd-video-primary-info-renderer").text.strip()
print(f"\n正在处理视频: {title}")
# 确保评论区加载
self.load_comments_section()
# 获取评论
comments = self.get_comments()
return {
'url': video_url,
'title': title,
'comments': comments
}
except Exception as e:
print(f"处理视频时出错: {str(e)}")
return None
def load_comments_section(self):
"""确保评论区完全加载"""
try:
# 等待评论区出现
WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "ytd-comments"))
)
# 滚动到评论区
comments_section = self.driver.find_element(By.TAG_NAME, "ytd-comments")
self.driver.execute_script("arguments[0].scrollIntoView(true);", comments_section)
time.sleep(3)
# 等待第一条评论加载
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "ytd-comment-thread-renderer"))
)
return True
except Exception as e:
print(f"加载评论区时出错: {str(e)}")
return False
def get_comments(self):
"""获取视频评论"""
comments = []
try:
# 等待评论区加载
WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "ytd-comments"))
)
time.sleep(3)
# 滚动到评论区
comments_section = self.driver.find_element(By.TAG_NAME, "ytd-comments")
self.driver.execute_script("arguments[0].scrollIntoView(true);", comments_section)
time.sleep(2)
# 分批加载评论
last_comment_count = 0
no_new_comments_count = 0
max_attempts = 30
for attempt in range(max_attempts):
# 滚动到底部
self.driver.execute_script("""
window.scrollTo(0, document.documentElement.scrollHeight);
// 尝试点击所有的"显示回复"按钮
document.querySelectorAll('ytd-button-renderer#more-replies:not([hidden])').forEach(button => {
if (button.offsetParent !== null) {
button.click();
}
});
""")
time.sleep(2)
# 获取当前评论数
current_comments = self.driver.execute_script("""
return document.querySelectorAll('ytd-comment-thread-renderer').length;
""")
print(f"\r已加载 {current_comments} 条评论线程", end="")
# 检查是否有新评论加载
if current_comments == last_comment_count:
no_new_comments_count += 1
if no_new_comments_count >= 3: # 连续3次没有新评论,认为加载完成
break
else:
no_new_comments_count = 0
last_comment_count = current_comments
# 每5次滚动尝试获取一次评论
if attempt % 5 == 0:
# 获取评论数据
new_comments = self.driver.execute_script("""
const comments = [];
const threads = document.querySelectorAll('ytd-comment-thread-renderer');
threads.forEach(thread => {
try {
// 获取主评论
const mainComment = thread.querySelector('#comment');
if (!mainComment) return;
const author = mainComment.querySelector('#author-text').textContent.trim();
const content = mainComment.querySelector('#content-text').textContent.trim();
const likes = mainComment.querySelector('#vote-count-middle').textContent.trim() || '0';
const time = mainComment.querySelector('#published-time-text').textContent.trim();
comments.push({
author: author,
text: content,
like_count: likes,
published_at: time,
is_reply: false
});
// 获取回复
const replies = thread.querySelectorAll('ytd-comment-renderer.ytd-comment-replies-renderer');
replies.forEach(reply => {
try {
const replyAuthor = reply.querySelector('#author-text').textContent.trim();
const replyContent = reply.querySelector('#content-text').textContent.trim();
const replyLikes = reply.querySelector('#vote-count-middle').textContent.trim() || '0';
const replyTime = reply.querySelector('#published-time-text').textContent.trim();
comments.push({
author: replyAuthor,
text: replyContent,
like_count: replyLikes,
published_at: replyTime,
is_reply: true
});
} catch (e) {
// 忽略单个回复的错误
}
});
} catch (e) {
// 忽略单个评论线程的错误
}
});
return comments;
""")
if new_comments:
comments = new_comments # 更新评论列表
print(f"\n成功获取 {len(comments)} 条评论(包括回复)")
return comments
except Exception as e:
print(f"\n获取评论时出错: {str(e)}")
return comments
def generate_report(self, videos):
"""生成HTML报告"""
try:
# 加载模板
template_loader = FileSystemLoader(searchpath="./templates")
template_env = Environment(loader=template_loader)
template = template_env.get_template("report_template.html")
# 渲染模板
html_content = template.render(videos=videos)
# 保存报告
with open('report.html', 'w', encoding='utf-8') as f:
f.write(html_content)
print("\n报告已生成: report.html")
except Exception as e:
print(f"生成报告时出错: {str(e)}")
def close(self):
"""关闭浏览器"""
self.driver.quit()
def main():
parser = argparse.ArgumentParser(description='YouTube评论爬虫')
parser.add_argument('url', help='YouTube频道或视频URL')
args = parser.parse_args()
scraper = YouTubeCommentScraper()
try:
# 处理输入的URL
channel_url = args.url
if not channel_url.endswith('/videos'):
channel_url = channel_url.rstrip('/') + '/videos'
print(f"正在访问: {channel_url}")
scraper.driver.get(channel_url)
time.sleep(5) # 给页面足够的加载时间
videos = scraper.get_video_list()
if not videos:
print("未找到任何视频")
return
# 处理每个视频的评论
all_video_data = []
for video in videos:
video_data = scraper.process_video(video['url'])
if video_data:
all_video_data.append(video_data)
# 生成报告
scraper.generate_report(all_video_data)
except Exception as e:
print(f"发生错误: {str(e)}")
finally:
scraper.driver.quit()
if __name__ == "__main__":
main()