python爬虫之多线程threading、多进程multiprocessing、协程aiohttp 批量下载图片
对于多任务爬虫来说,多线程、多进程、协程这几种方式处理效率的排序为:aiohttp协程 > 多线程 > 多进程。但是aiohttp协程难度有点复杂,需要了解,而且本人目前没有解决协程下载大尺寸图片不完整的情况,还需要后续继续学习。
一、单线程常规下载
常规单线程执行脚本爬取壁纸图片,只爬取一页的图片。
import datetime
import re
import requests
from bs4 import BeautifulSoup
start = datetime.datetime.now()
j = 0
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'}
def pic_re(url):
re = requests.get(url=url,headers=headers)
return re.text
def pic_download(soup):
list = soup.find(class_='contlistw mtw').find_all('li')
for item in list:
global j
pic_name = item.find('img')['alt']
pic_url = item.find('img')['lazysrc'].replace('.278.154.jpg','')
pic_type = re.sub(r'h.*\d+.','', pic_url)
#print(pic_name,pic_type,pic_url)
filename = '{}.{}'.format(pic_name, pic_type)
print('开始下载:'+filename)
with open(filename, 'wb') as f:
f.write(requests.get(pic_url,headers=headers).content)
print(filename+' 下载完成')
j += 1
def main():
for i in range(1,2):
url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
html=pic_re(url)
soup=BeautifulSoup(html,'lxml')
pic_download(soup)
date_all = (datetime.datetime.now() - start).total_seconds()
print(f'总共{j}张图片,下载总用时:{date_all}s')
if __name__ == '__main__':
main()
执行结果:
开始下载:站在油菜花地的小清新美女背影.jpg
站在油菜花地的小清新美女背影.jpg 下载完成
开始下载:穿大花袖子连衣裙的印度美女近照摄影.jpg
穿大花袖子连衣裙的印度美女近照摄影.jpg 下载完成
总共24张图片,下载总用时:485.885113s
进程已结束,退出代码0
结果,第一页24张图片,就下载差不多8分钟,排除网络等因素,还没有手动下载快。
二、多线程下载
上面的有两个循环,第一个是页面的循环,一页一页的加载,每页在单独循坏单独下载图片。
所以有两个等待时间,第一个就是等待第一页下载完成,才会到第二页。第二个等待就是每页图片一张下载完才下载第二张。
综上,优化两点:
第一点,第一步提取所有图片链接保存,不用一页等一页的提取。
第二点,所有图片多线程同时下载,不用等一个一个下载。
1:创建列表,储存图片信息
只需要两个信息,图片名称,和图片链接,储存到 pic_list = [] 列表。
pic_list = []
def get_pic_list(soup):
list = soup.find(class_='contlistw mtw').find_all('li')
for item in list:
global j
pic_name = item.find('img')['alt']
pic_url = item.find('img')['lazysrc'].replace('.278.154.jpg','')
pic_type = re.sub(r'h.*\d+.','', pic_url)
filename = 'pic\{}.{}'.format(pic_name, pic_type)
pic_list.append([filename,pic_url])
2:读取列表,多线程同时下载
threading说明:
-
创建空列表t_list,将三个子线程放入该列表,用于执行join,
-
执行子线程(start),start方法开启一个新线程。把需要并行处理的代码放在run()方法中,start()方法启动线程将自动调用 run()方法。
-
执行阻塞 (join),join函数可以理解为,如果某个子进程执行了join函数,那么在该子进程执行到join之前,父进程都会等待。
threading.Thread命令参数:
第一个为参数为函数,第二次参数为函数值。
- 使用args 传递参数 threading.Thread(target=target, args=(10, 100, 100)),args参数为元组,所以只有一个参数,以 , 结尾,例:args=(10,)
- 使用kwargs传递参数 threading.Thread(target=target, kwargs={“a”: 10, “b”:100, “c”: 100})
- 同时使用 args 和 kwargs 传递参数 threading.Thread(target=target, args=(10, ), kwargs={“b”: 100,“c”: 100})
分别创建下载函数,和多线程函数。
代码如下:
def image_down(filename,image_url):
re = requests.get(image_url,headers=headers)
print('开始下载:' + filename)
with open(filename, 'wb') as f:
f.write(re.content)
print(filename + ' 下载完成')
def thread_down():
t_list = []
for url in pic_list:
global j
t = threading.Thread(target=image_down,
kwargs={'filename': url[0], 'image_url': url[1]})
t_list.append(t)
t.start()
j += 1
for t in t_list:
t.join()
最终代码为:
import datetime
import re
import requests
from bs4 import BeautifulSoup
import threading
start = datetime.datetime.now()
j = 0
pic_list = []
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'}
def pic_re(url):
re = requests.get(url=url,headers=headers)
return re.text
def get_pic_list(soup):
list = soup.find(class_='contlistw mtw').find_all('li')
for item in list:
global j
pic_name = item.find('img')['alt']
pic_url = item.find('img')['lazysrc'].replace('.278.154.jpg','')
pic_type = re.sub(r'h.*\d+.','', pic_url)
filename = 'pic\{}.{}'.format(pic_name, pic_type)
pic_list.append([filename,pic_url])
def image_down(filename,image_url):
re = requests.get(image_url,headers=headers)
print('开始下载:' + filename)
with open(filename, 'wb') as f:
f.write(re.content)
print(filename + ' 下载完成')
def thread_down():
t_list = []
for url in pic_list:
global j
t = threading.Thread(target=image_down,
kwargs={'filename': url[0], 'image_url': url[1]})
t_list.append(t)
t.start()
j += 1
for t in t_list:
t.join()
def main():
for i in range(1,24):
url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
html=pic_re(url)
soup=BeautifulSoup(html,'lxml')
get_pic_list(soup)
thread_down()
date_all = (datetime.datetime.now() - start).total_seconds()
print(f'总共{j}张图片,下载总用时:{date_all}s')
if __name__ == '__main__':
main()
执行结果,533张图片,只用了差不多3分钟就下载完了
开始下载:pic\超清4K长发少女,高清到毛孔都看的见强烈推荐.png
pic\超清4K长发少女,高清到毛孔都看的见强烈推荐.png 下载完成
开始下载:pic\超高清长发清纯学生妹街拍电脑背景.jpg
pic\超高清长发清纯学生妹街拍电脑背景.jpg 下载完成
开始下载:pic\图书馆的气质少女高清头像壁纸图片-真8K壁纸推荐.png
pic\图书馆的气质少女高清头像壁纸图片-真8K壁纸推荐.png 下载完成
总共533张图片,下载总用时:182.8669s
进程已结束,退出代码0
三、图片不完整解决
以上虽然速度上来了,但是查看图片有下载失败,如0kb,或者图片不完整,半截是灰色的。
原因多半是网络原因,壁纸多半都是大尺寸,容量也大,图片1M到20M不等,经常会加载不全就下载下来或网络访问失败。
对于网络失败的(0kb),让他返回重新访问。用re.status_code == 200判断即可。
对于下载不全的,用其他的方式下载。这里用
image = Image.open(BytesIO(re.content)
因为这种方法,如果图片下载不全,会报错异常OSError: image file is truncated (X bytes not processed),通过捕获异常,同样重新返回执行。
为了防止因为网络原因,陷入死循环,设置返回次数,超过规定次数,则停止返回。
改image_down函数即可。
from PIL import Image
from io import BytesIO
def image_down(filename,image_url):
re = requests.get(image_url,headers=headers)
if re.status_code == 200:
try:
print('开始下载:' + filename)
image = Image.open(BytesIO(re.content))
image.save(filename)
print(filename + ' 下载完成')
except OSError:
count = 1
print('图片不完整,重新下载')
if count <= 5:
return image_down(filename, image_url)
else:
print(filename + ' 下载失败')
count += 1
else:
count = 1
print('网络错误,重新下载')
if count <= 5:
return image_down(filename,image_url)
else:
print(filename+' 下载失败')
count += 1
再次执行,图片全部下载完成,而且没有不全的图片了。
四、多进程下载
除了多线程之外,我们还可以使用多进程来提高爬虫速度
在Python中multiprocessing提供了两个用于多进程的类,即Process和Pool类
- multiprocessing.Process 无法批量开启子进程,可以直接用multiprocesssing.Queue等进行通信
- multiprocessing.Pool 可以批量开启子进程,不能直接用multiprocessing.Queue进行通信,只能通过共享内存,或者用multiprocessing.Manager()进行进程间通信。
Pool仅在内存中分配正在执行的进程,而Process在内存中分配所有任务,因此,当任务数较小时,我们可以使用Process类;当任务数较大时,我们可以使用Pool。
1、Process(用于创建进程)
multiprocessing模块提供了一个Process类来代表一个进程对象。
在multiprocessing中,每一个进程都用一个Process类来表示。
构造方法:Process([group [, target [, name [, args [, kwargs]]]]])
- start():启动进程,并调用该子进程中的p.run()
- run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
- terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
- is_alive():返回进程是否在运行。如果p仍然运行,返回True
- join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
使用方法和多线程threading.Thread的使用方法差不多。
p = Process(target=run_proc, args=('test',))
p.start()
p.join()
如,前面的代码,增加def multiprocess_down()类,同样,同时创建多个子进程,加入列表,子进程并发跑。
from multiprocessing import Process
def multiprocess_down():
p_list = []
for url in pic_list:
global j
p = Process(target=image_down,
kwargs={'filename': url[0], 'image_url': url[1]})
p_list.append(p)
p.start()
j += 1
for p in p_list:
p.join()
...
...
def main():
for i in range(1,3):
url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
html=pic_re(url)
soup=BeautifulSoup(html,'lxml')
get_pic_list(soup)
#thread_down()
multiprocess_down()
date_all = (datetime.datetime.now() - start).total_seconds()
print(f'总共{j}张图片,下载总用时:{date_all}s')
执行结果
开始下载:pic\LED大屏幕前的欧美美女超清桌面壁纸下载.jpg
pic\LED大屏幕前的欧美美女超清桌面壁纸下载.jpg 下载完成
开始下载:pic\手捧窗帘的欧美时尚模特高清壁纸.jpg
pic\手捧窗帘的欧美时尚模特高清壁纸.jpg 下载完成
开始下载:pic\站在油菜花地的小清新美女背影.jpg
pic\站在油菜花地的小清新美女背影.jpg 下载完成
总共48张图片,下载总用时:172.714426s
进程已结束,退出代码0
可以看出,这种多进程,显然没有多线程速度块,虽然也用了多个子进程同时跑,速度提升还是没有多线程速度快。
2、Pool(用于创建管理进程池)
构造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes :要创建的进程数,如果省略,将默认使用cpu_count()返回的数量。
- initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
- initargs:是要传给initializer的参数组。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
- context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
方法:
-
apply(),
函数原型:apply(func[, args=()[, kwds={}]])
该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不再出现) -
apply_async
函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
与apply用法一致,但它是非阻塞的且支持结果返回后进行回调 -
map()
函数原型:map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程 -
map_async()
函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的 -
close()
关闭进程池(pool),使其不再接受新的任务 -
terminal()
结束工作进程,不再处理未处理的任务 -
join()
主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用
Pool.apply_async:异步执行,结果的顺序不能保证与调用的顺序相同
Pool.map:同步执行,阻塞直到返回完整的结果,
map和map_async一次调用一个作业列表,但是apply和apply_async只能调用一个作业。但是,apply_async是在后台并行执行作业的。
我这里用apply_async,也可以用map,但是map正常情况只能传一个参数,要用map传多个参数,用starmap即可。
from multiprocessing import Pool
def multipoll_down():
p = Pool(multiprocessing.cpu_count())
for url in pic_list:
global j
p.apply_async(image_down,[url[0],url[1]])
#p.starmap(image_down, [(url[0], url[1]),])
j += 1
p.close()
p.join()
def main():
for i in range(1, 3):
url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
html = pic_re(url)
soup = BeautifulSoup(html, 'lxml')
get_pic_list(soup)
# thread_down()
# multiprocess_down()
multipoll_down()
date_all = (datetime.datetime.now() - start).total_seconds()
print(f'总共{j}张图片,下载总用时:{date_all}s')
执行结果,速度和process子进程并行差不多(我电脑cpu最大8个进程而已)。
五、协程下载
1:aiohttp 和 asyncio库
这里需要用到两个库aiohttp 和 asyncio。
aiohttp :可以把这个库当作 requests库 的替代品,因为requets不支持异步默认,所以这里需要用aiohttp 库替代requests库。
asyncio:asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加async关键。
async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是sleep(5))消失后,也就是5秒到了再回来执行。
asyncio基本流程
声明协程函数,函数前加async:
async def function():
创建任务:
asyncio.create_task(coro, *, name=None)
两种执行任务,简单等待asyncio.wait和并发执行asyncio.gather
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
awaitable asyncio.gather(*aws, return_exceptions=False)
运行 asyncio 程序:
asyncio.run(coro, *, debug=False)
Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
asyncio.get_event_loop().run_until_complete(main())
import aiohttp
import asyncio
async def images_down(filename, image_url):
async with aiohttp.ClientSession() as session:
async with session.get(image_url,headers=headers) as resp:
#print(filename,resp.status)
if resp.status == 200:
print('开始下载:' + filename)
with open(filename, 'wb') as f:
# 这个地方通过对流的处理,而不是一下子整体读取。
# 一下子整个的读取,会导致批量下载图片的时候,一开始会出现资源浪费,只下载几张图片
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
f.write(chunk)
print('下载完成:' + filename)
else:
print('网络错误,重新下载:'+ filename)
return await images_down(filename,image_url)
async def aiohttp_down():
for url in pic_list:
global j
task = [asyncio.create_task(images_down(url[0],url[1]))]
j += 1
#await asyncio.wait(task)
await asyncio.gather(*task)
def main():
for i in range(1, 3):
url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
html = pic_re(url)
soup = BeautifulSoup(html, 'lxml')
get_pic_list(soup)
loop = asyncio.get_event_loop()
loop.run_until_complete(aiohttp_down())
#asyncio.run(aiohttp_down())
date_all = (datetime.datetime.now() - start).total_seconds()
print(f'总共{j}张图片,下载总用时:{date_all}s')
执行结果:
下载完成:pic\鸽子飞过欧美性感深V装欧美美女高清壁纸.jpg
下载完成:pic\穿黄色衣服在路边黄花的摄影电脑壁纸.jpg
下载完成:pic\棕色色调穿朝鲜传统服饰的韩国高颜值美女.jpg
下载完成:pic\欧美古城街道摄影的欧美美女.jpg
下载完成:pic\沙滩上捧着花散步的长发美女电脑壁纸.jpg
总共48张图片,下载总用时:18.869591s
进程已结束,退出代码0
48张,只用了18秒左右,协程速度最快。
2:遇到的问题
问题一: RuntimeError: Event loop is closed。
asyncio.run()执行报错,所以改成 asyncio.get_event_loop().run_until_complete(main()),就不报错了,有趣的是,官方是推荐使用asyncio.run()的方法。
问题二: 图片下载速度快,但是大尺寸图片下载不完整。
跟多线程遇到的问题一致,1M到20M的图片,显示不完全,小的图片则正常。
尝试解决方法1:
跟多线程一致的方法,或者判断Content-Length值返回循环下载
f os.path.getsize(filename) != int(resp.headers["Content-Length"]):
return await images_down(filename, image_url)
结果:直接陷入死循环,发现所有下载图片基本不可能和Content-Length一致,即使下载完整。
尝试解决方法2:
用块的方式,导入aiofiles包,通过设置块(content.iter_chunked)的值尝试循环写入。
async def async_http_download(filename, image_url):
async with aiofiles.open(filename, 'wb') as fd:
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as resp:
async for chunk in resp.content.iter_chunked(1024):
await fd.write(chunk)
结果,虽然不完整的减少,但是大尺寸的图,还是有,解决未果。
用aiohttp虽然代替requests库使用,估计功能还是没有requests库完整及全面,等后续在找解决方法。
六、总结
对于多任务爬虫来说,多线程、多进程、协程这几种方式处理效率的排序为:aiohttp协程 > 多线程 > 多进程。
但是aiohttp协程难度有点复杂,需要了解,而且本人目前没有解决协程下载大尺寸图片不完整的情况,还需要后续继续学习。
更多推荐
所有评论(0)