支持功能:
自己选择要下载的视频分类
视频个数(12的倍数,最新的...个)(不要怪不能高度自定义啦~)
重复文件自动跳过
哎,舒服了~
强调:仅供学习练习用,请不要乱来
# 把文件名搞一下 import re import os import requests from concurrent.futures import ThreadPoolExecutor class PearDownLoader: def __init__(self, page_url, params, headers, cookie=None, video_page_target_str_prefix=None, video_page_target_re=None, video_target_re=None, video_name_re=None, video_directory=None): self.headers = headers self.cookie = cookie self.url = page_url self.params = params self.video_page_target_str_prefix = video_page_target_str_prefix self.video_page_target_re = video_page_target_re self.video_target_re = video_target_re self.video_name_re = video_name_re self.video_directory = video_directory self.pool = ThreadPoolExecutor() def get_video_page_urls(self): res = requests.get(self.url, params=self.params, headers=self.headers) video_page_target_str_list = re.findall(self.video_page_target_re, res.text) urls_list = [self.video_page_target_str_prefix + video_page_target_str for video_page_target_str in video_page_target_str_list] print(urls_list) # 至此,已经获取到了推送的几条视频连接地址 return urls_list def get_video_urls(self, video_page_urls): videos_list = [] for video_page_url in video_page_urls: res = requests.get(video_page_url, headers=self.headers) # 获取视频链接 video_target_url = re.findall(self.video_target_re, res.text)[0] # 获取视频名称 video_name = re.findall(self.video_name_re, res.text)[0] video_dic = { "video_target_url": video_target_url, "video_name": video_name, } # print("video_dic:", video_dic) # 组织成 [{url: "filename"}] 的形式 videos_list.append(video_dic) return videos_list def download_videos(self, videos_list): # 先判断有没有传参数,没有给个默认的 if not self.video_directory: self.video_directory = os.path.join(os.path.dirname(__file__), 'pear_download_videos') # 按分类创文件夹存储视频 download_category_name = video_category_dic.get(str(self.params.get('categoryId')), 'unknow_category') self.video_category_directory = os.path.join(self.video_directory, download_category_name) # 判断该路径存不存在,不存在创建一下 if not os.path.exists(self.video_category_directory): os.makedirs(self.video_category_directory) # 用多线程去下载 for video_dic in videos_list: # self.pool.submit(download_video_by_thread, self, video_dic) self.pool.submit(self.download_video_by_thread, video_dic) def download_video_by_thread(self, video_dic): file_name = video_dic.get('video_name', 'video_target_url') absolute_file_path = os.path.join(self.video_category_directory, file_name + '.mp4') # 得到视频内容,写到文件中 if os.path.exists(absolute_file_path): print(f"【{file_name}】 已存在,跳过下载...") return None print(f"【{file_name}】 start download ...") video_url = video_dic.get('video_target_url') res = requests.get(video_url, headers=self.headers) with open(absolute_file_path, 'wb') as f: for line in res.iter_content(): f.write(line) print(f"【{file_name}】 download compelete...") # 一步调用,把上面几个歩鄹合到一起(保留三个歩鄹便于查看每一步的执行结果,排错) def easy_get_videos(self): video_page_urls = self.get_video_page_urls() videos_list = self.get_video_urls(video_page_urls=video_page_urls) self.download_videos(videos_list=videos_list) if __name__ == '__main__': # 视频分类 video_category_dic = { "1": "社会", "2": "世界", "3": "财富", "4": "娱乐", "5": "生活", "6": "美食", "8": "科技", "9": "体育", "10": "新知", "31": "汽车", "59": "音乐", "8889": "旗帜", } # 准备参数 url = "https://www.pearvideo.com/category_loading.jsp" params = { "reqType": 5, # 字符串和数字都要 html 转码,区别不大 "categoryId": 8, # 梨视频 科技 频道 "start": 0, # 0 是懒加载的第一个视频 } headers = { "Referer": "https://www.pearvideo.com/category_8", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36", } video_page_target_str_prefix = "https://www.pearvideo.com/" video_page_target_re = '<a href="http://www.likecs.com/(.*?)">' video_target_re = ',srcUrl="http://www.likecs.com/(.*?)"' video_name_re = '<h1>(.*?)</h1>' video_directory = os.path.join(os.path.dirname(__file__), 'pear_download_videos') pear_downloader = PearDownLoader(url, params, headers, video_page_target_str_prefix=video_page_target_str_prefix, video_page_target_re=video_page_target_re, video_target_re=video_target_re, video_name_re=video_name_re, video_directory=video_directory) while True: while True: print("现有如下视频分类可选择:") for key, value in video_category_dic.items(): print(f"{key}.{value}", end='\t') want_video_type = input("\n请输入您要下载的视频类型编号:").strip() if want_video_type in video_category_dic: want_video_category_id = int(want_video_type) break print("您输入的编号有误,请重新输入!") while True: want_video_pear_count = input("请输入您要下载的视频数量:12 * ").strip() if want_video_pear_count.isdigit(): want_video_pear_count = int(want_video_pear_count) break print("请输入合法数字!") # 开始分批下载 try: for i in range(want_video_pear_count): pear_downloader.params['categoryId'] = want_video_category_id pear_downloader.params['start'] = pear_downloader.params['start'] + 12 pear_downloader.easy_get_videos() except Exception as e: pear_downloader.pool.shutdown() # 关闭池子且等待池子中所有的任务运行完毕 print(e)