Python 并发编程

一. 为什么需要引入并发编程呢?

场景1:一个网络爬虫,按顺序爬取花了一小时,采用并发下载减少到20 min,场景2:一个app 应用,优化前每次打开页面需要3s,采用异步并发提升到每次200ms;引入并发,就是为了提升程序运行的速度,有哪些程序提升速度的方法呢?

① 单线程串行:由CPU 和 IO 轮流执行;② 多线程并发(threading)③ 多CPU 并行(multiprocessing)④ 多机器并行;

python 对于并发编程的支持:

  1. 多线程:threading,利用CPU 和 IO 可以同时执行的原理,让CPU 不会干巴巴等待IO 完成;
  2. 多进程:multiprocessing,利用多核CPU的能力,真正并行执行任务;
  3. 异步IO:asyncio,在单线程利用CPU 和 IO 同时执行的原理,实现函数的异步执行;
  4. 使用 Lock 对资源进行加锁,防止冲突访问;python 也提供了Queue 实现不同线程/进程之间的数据通信 ,实现生产者--消费者模式 
  5. 使用线程池Pool/进程池Pool,简化线程/进程任务提交,等待结果、获取结果;
  6. 使用subprocess 启动外部程序的进程,并进行输入输出的交互;

二. 怎么样选择多线程,多进程和多协程?

python 并发编程有三种模式:1. 多线程Thread;2. 多进程Process;3. 多协程Corroutine;思考一下三个问题:1. 什么是CPU密集型计算,IO 密集型计算;2. 多线程,多进程和多协程的区别;3. 怎么样根据任务选择对应技术?

1. CPU 密集型与 I/O 密集型

CPU 密集型(CPU-bound):CPU 密集型也叫做计算密集型,是指I/O在很短时间内就可以完成,CPU 需要大量的计算和处理,特点是CPU 占用率相当高;例如:压缩解压缩、加密解密、正则表达式搜索;I/O密集型(I/O bound):IO 密集型指的是系统运作大部分的状况是CPU 在等I/O(硬盘,内存)的读写操作,CPU 占用率较低,例如:文件处理程序,网络爬虫程序,读写数据库程序;

2. 多线程,多进程和多协程的对比

一个进程中可以启动 N 个进程,一个线程中可以启动 N 个协程;多进程,多线程,多协程三种技术中只有多进程能够同时利用多核cpu并行计算;

多进程 Process (multiprocessing):优点:可以利用多核cpu 并行计算;缺点:占用资源最多,可以启动的数目比线程要少;适用于cpu 密集型计算;

多线程 Thread (threading):优点:相比进程,更加轻量级,占用资源较少;缺点:相比进程:多线程只能够并发执行,不能够利用多CPU(GIL),这是python 多线程一个很大的缺点,同一时间只能够使用一个cpu,相比于协程:启动数目有限,占用内存资源,有线程切换开销,多线程适用于IO密集行型计算,同时运行的任务数目要求不多;

多协程 Coroutine (asyncio):优点:内存开销最小,启动协程数目最多;缺点:支持的库有限制,代码实现复杂;很多库函是不支持协程的(aiohttp);适用于:IO 密集型计算,需要超多任务运行,但是有现成库支持的场景;

三. python 的全局解释器锁(GIL)

1. python 速度慢的两大原因

相比对c++/java,python 确实比较慢,在一些特殊场景下,python 比c++ 慢100~200倍;由于速度慢的原因,很多公司的基础架构代码仍然使用c/c++开发。python 速度慢的两大原因:① python 属于动态类型语言,边解释边执行;② python 中由于存在GIL,无法利用多核cpu 并发执行。

2. GIL 是什么?

全局解释器锁(GIL):是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻只有一个线程在执行;即使在多核处理器上,使用GIL 的解释器也只允许同一时间执行一个线程;

Python 并发编程_第1张图片

所以当GIL存在的时候,即使电脑有多核cpu,单个时刻也只能够执行1个,相比并发加速的c++/java 所以慢;

3. 为什么会有GIL 这个东西?

简而言之,python 设计初期,为了规避并发问题引入了GIL,为了解决多线程之间数据完整性和状态同步问题;python 中对象的管理,是使用引用计数器进行的,引用数为0则释放对象,但是GIL 确实有好处,简化了python对于共享资源的管理;

4. 怎么样规避GIL 带来的限制?

多线程 threading 机制依然是有用的,用于IO 密集型计算,因为在IO期间,线程会释放GIL,实现CPU 和IO的并行,因此多线程用于IO密集型计算依然可以大幅度提升速度,但是多线程用于CPU密集型计算的时候,只会更加拖慢速度;使用multiprocessing 的多进程机制实现并行计算,利用多核cpu 的优势,为了应对GIL 的问题,python 提供了multiprocessing。

四. 使用多线程加速python 爬虫

使用多线程,python 爬虫被加速10倍;

1. python 创建线程的方法;

① 准备一个函数: func (a,b);② 怎样创建一个线程;threading.Thread(target=func, args=(100,200)); ③ 启动线程:t.start();4. 等待结束:t.join();

2. 改写爬虫程序,变成多线程爬取;

使用多线程爬取发现爬取的速度快了将近10倍,我们可以创建一个blog_spider.py和multi_thread_craw.py 文件,blog_spider.py 文件调用requests 库中的get()方法,multi_thread_craw.py 分别使用单线程和多线程进行爬取

blog_spider.py:

import requests

# urls存储爬取页面的url, 以f开头表示在字符串内支持大括号内的python表达式(列表表达式生成列表的每一个元素)
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 51)]


# 爬取函数
def craw(url: str):
    # get()方法给url 对应的服务器发送一个请求, 返回值为Response 类
    r = requests.get(url)
    print(url, len(r.text))

multi_thread_craw.py:

import threading
import time

import blog_spider


def single_thread():
    print("single-thread begin")
    for url in blog_spider.urls:
        blog_spider.craw(url)
    print("single-thread end")


def multi_thread():
    print("multi-thread begin")
    # threads 存储线程对象
    threads = list()
    for url in blog_spider.urls:
        threads.append(
            # target 传递的是调用的函数, 当前调用的函数需要开启线程来处理, args为函数的参数, 需要传递一个元祖类型
            threading.Thread(target=blog_spider.craw, args=(url,))
        )
    for thread in threads:
        # 使用start()启动线程
        thread.start()
    # 使用join()函数等待结束
    for thread in threads:
        thread.join()
    print("multi-thread end")


if __name__ == "__main__":
    # 计算使用单线程爬取的耗时
    start = time.time()
    single_thread()
    end = time.time()
    print("single-thread cost: ", end - start, " seconds")
    
    # 计算使用多线程爬取的耗时
    start = time.time()
    multi_thread()
    end = time.time()
    print("multi-thread cost: ", end - start, " seconds")

根据输出结果可以发现单线程是按照顺序爬取的,而多线程是并发执行的,没有顺序的:

Python 并发编程_第2张图片

Python 并发编程_第3张图片

五. 生产消费者爬虫

1. 多组件的Pipeline 技术架构

复杂的事情一般不会一下子做完,而是会分很多中间的步骤来完成;

Python 并发编程_第4张图片

2. 生产者消费者的爬虫架构

Python 并发编程_第5张图片 3. 多线程数据通信的queue.Queue

queue.Queue 可以用于多线程之间的,线程安全的数据通信

import queue

q = queue.Queue()
# 添加元素, 当队列满的时候会一直等待直到有人取出元素
q.put(elem)
# 当队列为空的时候使用get()会一直阻塞直到队列中有元素之后
elem = q.get()

4. 生产者消费者爬虫

新建一个blog_spider.py,producer_consumer_spider.py:

可以查看BeautifulSoup第三方库的帮助文档,里面有详细的用法,这个库主要用来解析html 页面中的内筒,blog_spider.py:

import requests
from bs4 import BeautifulSoup

# urls存储爬取页面的url, 以f开头表示在字符串内支持大括号内的python表达式
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 51)]


# 爬取函数
def craw(url: str):
    r = requests.get(url)
    return r.text


def parse(html):
    soup = BeautifulSoup(html, "html.parser")
    # links = soup.find_all("a", "post-item-title")
    # 当html 文档需要获取对应的css属性对应的标签的时候这个时候可以使用 css属性名字_= "属性值"的方式
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]

producer_consumer_spider.py:

import queue
import blog_spider
import time
import random
import threading

# do_craw方法用来获取url_queue队列中url的值并爬取url地址对应的html内容并将其放入到html_queue队列中
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = blog_spider.craw(url)
        html_queue.put(html)
        print(threading.current_thread().name, f"craw {url}", "url_queue.size=", url_queue.qsize())
        time.sleep(random.randint(1, 2))

# do_parse方法用来获取爬取的html_queue的html内容并将解析, 将解析的结果写入到文件中
def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()
        results = blog_spider.parse(html)
        for res in results:
            fout.write(str(res) + "\n")
        print(threading.current_thread().name, f"result.size", len(results), "html_queue.size=", html_queue.qsize())
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in blog_spider.urls:
        url_queue.put(url)
    fout = open("data.txt", "w")
    # 开启三个线程来处理这些爬取html的函数
    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
        t.start()
    # 开启两个线程解析爬取的html页面的内容
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
        t.start()

上面的生产者与消费者模型类似于go 语言的多协程并发执行,启动线程来调用函数的时候会根据cpu的调度轮流执行,生产者不断解析队列中url 爬取的hmtl 内容并将起放入到队列中,消费者不不停地取出生产者生产的存储在队列中的内容,如果没有内容则会等待,直到消费者消费了队列中的内容或者生产者生产了内容,其实就是多线程并发执行的思想。

六. python 线程安全问题以及解决方案

1. 线程安全的概念

线程安全是指某个函数,函数库在多线程环境中被调用的时候,能够正确地处理多个线程之间的共享变量;由于线程的执行随时可能会发生切换,就造成了不可预料的结果,出现线程不安全的结果;

2. python 可以使用 Lock 类来解决线程安全问题

存在两种方式对发生线程安全的代码进行加锁:① try-finally 模式;② with 模式;

# 用法1
import threading

lock = threading.Lock()
lock.acquire()
try:
    ...

finally:
    lock.release()

# 用法2
import threading
lock = threading.Lock()
with lock:
    ...

下面是一个取钱的例子,线程a和线程b是并发执行的,一开始的时候轮到线程a执行发现1000 >= 800 成立所以执行if 判断中的语句,此时可能还没有执行到account.balance -= amount 然后切换到第二个线程b 执行,此时余额还是1000 满足if 判断所以也会进入到if 判断,然后可能切换到线程a,执行account.balance -= amount 此时余额已经变为200了,而对于线程b 来说已经进入了if 判断然后继续执行 account.balance -= amount,导致余额变为了-600,这样在并发执行的过程中就发生了线程安全问题,发生了不可预料的结果,我们可以在进入if 判断之后加上time.sleep()这样在线程轮流切换的时候会更容易出现问题:

import threading
import time


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    if account.balance >= amount:
        # 调用sleep()大概率会出现问题, 因为此时会发生线程的切换, 那么就一定会进入if判断的语句那么两个线程都会balance 操作就出现了问题
        time.sleep(0.1)
        print(threading.current_thread().name + "取钱成功")
        account.balance -= amount
        print(threading.current_thread().name + " 余额: ", account.balance)

    else:
        print(threading.current_thread().name, "取钱失败, 余额不足")


if __name__ == '__main__':
    account = Account(1000)
    # 使用线程a, b来执行draw 函数
    ta = threading.Thread(name="a", target=draw, args=(account, 800))
    tb = threading.Thread(name="b", target=draw, args=(account, 800))
    # 使用start()会启动一个线程
    ta.start()
    tb.start()
# 有可能输出错误的结果, 也有可能输出正确的结果
a取钱成功
b取钱成功
b 余额:  200
a 余额:  -600

# 不够大部分的情况下输出的答案都是正确的, 这就是多个线程共同操作共享变量的时候发生的线程安全问题

a取钱成功
a 余额:  200
b 取钱失败, 余额不足

对发生线程安全的代码进行加锁解决线程安全问题:

import threading
import time

# 获取锁对象
lock = threading.Lock()


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    # 使用with关键字对发生线程安全的代码进行加锁, 当一个线程执行完成之后才会释放锁另外一个线程才可以获取到锁
    with lock:
        if account.balance >= amount:
            # 调用sleep()大概率会出现问题
            time.sleep(0.1)
            print(threading.current_thread().name + "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name + " 余额: ", account.balance)

        else:
            print(threading.current_thread().name, "取钱失败, 余额不足")


if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(name="a", target=draw, args=(account, 800))
    tb = threading.Thread(name="b", target=draw, args=(account, 800))
    ta.start()
    tb.start()

七. Python 线程池 ThreadPoolExecutor

1. 线程池的原理

Python 并发编程_第6张图片

新建线程需要系统分配资源,终止线程需要系统回收资源,如果可以重用线程,则可以减去新建/终止的开销,线程池就是这样一个概念: 

Python 并发编程_第7张图片

2. 使用线程池的好处

① 提升了性能:因为减去了大量新建,终止线程的开销,重用了线程资源;② 适用场景:处理处理突发性大量请求或者需要大量线程完成任务,但是实际上任务处理时间较短;

3. 防御功能:能够有效避免系统因为创建线程过多,而导致的系统负荷过大相应变慢等问题;

3. ThreadPoolExecutor 的使用,使用线程池改造爬虫程序

新建thread_pool.py:

import concurrent.futures
import blog_spider

with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(blog_spider.craw, blog_spider.urls)
    htmls = list(zip(blog_spider.urls, htmls))
    for url, html in htmls:
        print(url, len(html))
print("craw over")

with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = dict()
    # 使用submit的方式
    for url, html in htmls:
        future = pool.submit(blog_spider.parse, html)
        # future与url建立一个关系
        futures[future] = url
    for future, url in futures.items():
        print(url, future.result())
    # as_completed 哪一个任务先执行完就返回哪一个结果
    # for future in concurrent.futures.as_completed(futures):
    #     url = futures[future]
    #     print(url, future.result())
print("parse over")

八. 在web 服务中使用线程池加速

1. web 服务的架构以及特点

Python 并发编程_第8张图片

web 后台服务的特点:

① web 服务对响应时间要求非常高,比如要求200ms 返回;② web 服务有大量的依赖IO操作的调用,比如磁盘文件,数据库,远程API(可以使用线程池的技术加速);③ web 服务经常需要处理几万人,几百万人同时请求;

2. 使用线程池ThreadPoolExecutor 加速

① 方便将磁盘文件,数据库,远程API的IO 调用并发执行;② 线程池的线程数目不会无限创建(导致系统挂掉);具有防御功能;

3. 代码用Flask 实现web 服务并实现加速

原来的程序:

import json
import time

import flask

app = flask.Flask(__name__)

# 使用time.sleep()方法模拟读取文件的操作, 其余两个方法也是类似的
def read_file():
    time.sleep(0.1)
    return "file_read result"


def read_db():
    time.sleep(0.2)
    return "db_read result"


def read_api():
    time.sleep(0.6)
    return "api_read result"

# 映射的url
@app.route("/")
def index():
    result_file = read_file()
    result_db = read_db()
    result_api = read_api()
    #返回json字符串
    return json.dumps({
        "result_file": result_file,
        "result_db": result_db,
        "result_api": result_api
    })


if __name__ == '__main__':
    # 启动flask 服务
    app.run()

使用cmd 命令行访问:curl http://127.0.0.1:5000/

改进:这三种操作都是IO操作,可以使用线程池进行加速,因为三个读取IO操作的时间总共为600ms,使用线程池加速之后那么相当于并发执行最终只需要花费读取IO操作最长的时间为300ms即可完成:

import json
import time
from concurrent.futures import ThreadPoolExecutor
import flask
# 创建一个全局的pool
pool = ThreadPoolExecutor()

app = flask.Flask(__name__)


def read_file():
    time.sleep(0.1)
    return "file_read result"


def read_db():
    time.sleep(0.2)
    return "db_read result"


def read_api():
    time.sleep(0.6)
    return "api_read result"


@app.route("/")
def index():
    # 使用submit()函数返回的是Future对象
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)
    return json.dumps({
        "result_file": result_file.result(),
        "result_db": result_db.result(),
        "result_api": result_api.result()
    })


if __name__ == '__main__':
    # 启动flask 服务
    app.run()

九. 使用多进程 multiprocessing 加速程序的运行

1. 有了多线程threading,为什么还要用多进程multiprocessing,虽然对于IO 密集型来说多线程确实可以实现加速的效果,因为当遇到IO的时候cpu会切换到另外的线程,此时当前的线程可以执行当前的IO操作,这样就实现了多线程加速的效果,但是当遇到的是cpu 密集型来说,没有IO的操作,当一个线程执行一段时间之后那么切换到其他的线程,此时会发生线程切换的消耗,非但没有实现加速的效果,返回减慢了运行速度;而multiprocessing 模块就是为了python 为了解决GIL 缺陷而引入的一个模块,原理是在用多进程在多cpu 上并行执行;

Python 并发编程_第9张图片

2. 多进程multiprocessing 知识梳理

Python 并发编程_第10张图片

3. 单线程,多线程,多进程对比 CPU密集计算速度

cpu 密集型计算:100次判断大数字是否是素数的计算,由于GIL 的存在,多线程比单线程计算还慢,而多进程可以明显加快执行速度

你可能感兴趣的