生产者与消费者模式

 

目录

1、生产者与消费者模式

2、线程间的通信

3、Queue线程安全队列

4、课堂案例(Queue在多线程中的使用)

5、课堂案例(多线程下载王者荣耀高清壁纸)

1)分析URL

2)爬取第一页壁纸的url

3)存储壁纸

4)多线程下载壁纸


1、生产者与消费者模式

  • 生产者与消费者模式是多线程开发中常见到的一种模式
  • 生产者线程
    • 生产者线程用于“生产”数据
  • 消费者线程
    • 消费者线程用于“消费”数据

生产者与消费者模式_第1张图片

import time
import random
import threading
g_money = 0      # 全局变量
lock=threading.Lock()   # 创建锁对象
class Producer(threading.Thread):
    def run(self):
        global g_money
        for _ in range(10):   # 不需要使用变量,只需要计算次数的时候就可以用下划线_
            lock.acquire()      # 加锁
            money=random.randint(10000,80000)
            g_money+=money
            print(threading.current_thread().getName(),f'挣了{money}元钱,当前余额为{g_money}元')
            time.sleep(1)
            lock.release()      # 释放锁

class Customer(threading.Thread):
    def run(self):
        global g_money
        for _ in range(10):
            lock.acquire()
            money = random.randint(10000, 80000)
            if money<=g_money:
                g_money-=money
                print(threading.current_thread().getName(), f'花了{money}元钱,当前余额为{g_money}元')
            else:
                print(threading.current_thread().getName(), f'想花了{money}元钱,但是余额不足,当前余额为{g_money}元')
            time.sleep(1)
            lock.release()

def start():
    for i in range(5):
        t=Producer(name='生产者{0}'.format(i))
        t.start()

    for i in range(5):
        t=Customer(name='--------消费者{0}'.format(i))
        t.start()

if __name__ == '__main__':
    start()     # 调用自定义的start()函数,创建线程对象,启动线程

2、线程间的通信

  • Condition版的生产者与消费者模式

序号

函数

描述

1

acquire()

上锁

2

release()

解锁

3

wait()

将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify()和notify_all()函数唤醒。被唤醒后会继续等待上锁,上锁后继续执行下面的代码。

4

notify()

通知某个正等待的线程,默认是第1个等待的线程

5

notify_all()

通知所有正等待的线程。notify()和notify_all()需要在release()之前调用

消费者已经余额不足了,就不需要再消费了

import random
import threading
g_money = 0      # 全局变量
lock=threading.Condition()   # 创建Condition对象
g_time=0
class Producer(threading.Thread):
    def run(self):
        global g_money
        global g_time
        for _ in range(10):   # 不需要使用变量,只需要计算次数的时候就可以用下划线_
            lock.acquire()      # 加锁
            money=random.randint(10000,80000)
            g_money+=money
            g_time+=1
            print(threading.current_thread().getName(),f'挣了{money}元钱,当前余额为{g_money}元')
            lock.notify_all()
            lock.release()      # 释放锁

class Customer(threading.Thread):
    def run(self):
        global g_money
        for _ in range(10):
            lock.acquire()
            money = random.randint(80000, 100000)
            while g_money < money:
                if g_time>=10:
                    lock.release()
                    return
                print(threading.current_thread().getName(), f'想花了{money}元钱,但是余额不足,当前余额为{g_money}元')
                lock.wait()     # 余额不足的情况下需要等待生产者赚钱唤醒

            g_money-=money
            print(threading.current_thread().getName(), f'--------共花了{money}元钱,当前余额为{g_money}元')
            lock.release()

def start():
    for i in range(5):
        t=Producer(name='生产者{0}'.format(i))
        t.start()

    for i in range(5):
        t=Customer(name='--------消费者{0}'.format(i))
        t.start()

if __name__ == '__main__':
    start()     # 调用自定义的start()函数,创建线程对象,启动线程

3、Queue线程安全队列

  • Python内置的线程安全的模块叫queue(再也不用加锁解锁了)
    • FIFO(先进先出)队列Queue
    • LIFO(后进先出)队列Queue

序号

函数

描述

1

qsize()

返回队列的大小

2

empty()

判断队列是否为空

3

full()

判断队列是否满了

4

get()

从队列中取最先插入的数据

5

put()

将一个数据放到队列中

from queue import Queue     # FIFO

q=Queue(5)      # 创建一个队列,最多可以存放5个数据
# 向队列中存放数据
for i in range(4):
    q.put(i)
print('队列中实际数据的多少:', q.qsize())     # 队列中实际数据的多少: 4

for _ in range(5):
    try:
        print(q.get(block=False))       # 0  1  2  3
    except:
        print('数据已经取完,队列目前为空')
        break

if q.full():
    print('队列已满')
else:
    print('队列当前数据的个数为:', q.qsize(),'队列不满')   # 个数为: 0 。因为上面已经把数据取完了


q2=Queue(5)
for i in range(6):  # 程序最多放5个,可以现在有6个,最后一个一直处于等待状态,程序结束不了
    try:
        q2.put(i, block=False)
    except:
        print('队列已满')
        break

print('程序结束')

总结:使用block的情况:

1、队很长,元素很少,get使用

2、队很短,元素很多,put使用

4、课堂案例(Queue在多线程中的使用)

  • Queue是线程安全的队列,在使用时无需加锁,可以在多线程中直接使用
  • 队列也是实现线程间同步的方式

from queue import Queue
import random
import time
import threading

def add_value(q):
    while True:
        q.put(random.randint(100,1000))
        time.sleep(1)

def get_value(q):
    while True:
        print('取出了元素:{0}'.format(q.get()))

def start():
    q = Queue(10)
    t1 = threading.Thread(target=add_value, args=(q,))    # 元组后面只有一个元素需要加逗号
    t2 = threading.Thread(target=get_value, args=(q,))
    t1.start()
    t2.start()

if __name__ == '__main__':
    start()

5、课堂案例(多线程下载王者荣耀高清壁纸)

1)分析URL

  • 分析URL
    • 高清壁纸的URL:王者荣耀壁纸下载-王者荣耀官方网站-腾讯游戏
    • 含有数据的URL:
    • 页码范围:一共25页,页码范围为0到24

验证是否是Ajax请求:

1、点击第2页,URL栏地址不变,点击XHR,找到唯一一个,但是看preview发现无内容,说明不是

2、看F12和网页源代码是否是一样的。一样的说明不是Ajax请求。如果不一样,说明数据并没有在服务器端,而是通过另外的请求发回的客户端,在客户端组装的。

找真实的数据源:点击ALL,点击含WorkLis(就是和高清壁纸div板块的class名有点像的)的,复制它的Request URL,粘贴到URL栏。复制获取到的数据,粘贴到json.cn,掐头去尾进行解析。

from urllib import parse    # 可以解析网址
result=parse.unquote('http%3A%2F%2Fshp%2Eqpic%2Ecn%2Fishow%2F2735042018%2F1618915966%5F84828260%5F2160%5FsProdImgNo%5F8%2Ejpg%2F200')
print(result)   # 得到链接,粘贴到URL上,发现是图片,但是图片很小
# http://shp.qpic.cn/ishow/2735042018/1618915966_84828260_2160_sProdImgNo_8.jpg/200

# 在F12Element中找最大图片的URL,对比获得的数据,把末尾的200改成0即可获得最大图片
# http://shp.qpic.cn/ishow/2735042018/1618915966_84828260_2160_sProdImgNo_8.jpg/0

2)爬取第一页壁纸的url

  • 添加请求头参数headers应对反爬
  • urllib下的parse解析URL与解码中文编码

当打印resp.text时,返回的数据需要掐头去尾才是json数据,如果不需要掐头去尾,也可以发现我们的url中有一段与json数据前一样的值,把它删掉重新运行即可。

打印resp.text时:jQuery171032545231727144275_1619310537581( 需要删去

或者删掉url中:&jsoncallback=jQuery171032545231727144275_1619310537581,重新运行

import requests
from urllib import parse

headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
'referer': 'https://pvp.qq.com/'
}

def exact_url(data):        # 提取一个data中的8张壁纸
    image_url_lst=[]
    for i in range(1,9):
        image_url=parse.unquote(data['sProdImgNo_{}'.format(i)].replace('200','0'))
        image_url_lst.append(image_url)
    return image_url_lst

def send_request():
    # url是含有数据的Request URL,即worklist的Request URL
    url='https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=0&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1619310537883'
    resp = requests.get(url, headers=headers)
    # print(resp.text)
    return resp.json()


def parse_json(json_data):
    d={}
    data_lst=json_data['List']
    for data in data_lst:
        image_url_lst=exact_url(data)
        sProdName=parse.unquote(data['sProdName'])
        d[sProdName]=image_url_lst
    for item in d:
        print(item, d[item])

def start():
    json_data=send_request()
    parse_json(json_data)

if __name__ == '__main__':
    start()

3)存储壁纸

  • 路径的拼接
  • os.mkdir()创建文件夹
  • request.urlretrieve(url,path)从url处下载文件并存储到path中

import requests
from urllib import parse
from urllib import request
import os

headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
'referer': 'https://pvp.qq.com/'
}

def exact_url(data):        # 提取一个data中的8张壁纸
    image_url_lst=[]
    for i in range(1,9):
        image_url=parse.unquote(data['sProdImgNo_{}'.format(i)].replace('200','0'))
        image_url_lst.append(image_url)
    return image_url_lst

def send_request():
    # url是含有数据的Request URL,即worklist的Request URL
    url='https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=0&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1619310537883'
    resp = requests.get(url, headers=headers)
    # print(resp.text)
    return resp.json()

def parse_json(json_data):
    d={}
    data_lst=json_data['List']
    for data in data_lst:
        image_url_lst=exact_url(data)
        sProdName=parse.unquote(data['sProdName'])
        d[sProdName]=image_url_lst
    # for item in d:
    #     print(item, d[item])
    save_jsp(d)

def save_jsp(d):
    for key in d:
        # 拼接路径  image/李白-鸣剑·曳影   image/露娜-瓷语鉴心
        dirpath=os.path.join('img/王者荣耀壁纸/', key.strip(' '))
        os.mkdir(dirpath)
        # 下载图片并保存
        for index,image_url in enumerate(d[key]):
            request.urlretrieve(image_url,os.path.join(dirpath,'{}.jpg'.format(index+1)))
            print('{}下载完毕'.format(d[key][index]))

def start():
    json_data=send_request()
    parse_json(json_data)

if __name__ == '__main__':
    start()

4)多线程下载壁纸

  • 生产者线程
    • page_queue.get()
    • image_url_queue.put()
    • 用于生产图片路径
  • 消费者线程
    • image_url_queue.get()
    • 用于下载并存储

# 用于编辑下载路径
import os
# 用来发请求
import requests
# 多线程,用来继承
import threading
# 创建队列,配合多线程
from queue import Queue
# url解码
from urllib import parse
# 用于下载保存文件
from urllib import request

# 全局变量
headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
    'referer': 'https://pvp.qq.com/'
}


# 用于壁纸url解码,利用循环迭代获取data数据中8个键值
def exact_url(data):
    image_url_lst = []
    for i in range(1, 9):
        # 获取json中(sProdImgNo_1~9)键值数据(获得9个普通壁纸链接),将url中的200替换为0,以得到高清壁纸链接,解析url
        image_url = parse.unquote(data['sProdImgNo_{}'.format(i)]).replace('200', '0')
        # 将解析好的url存入列表
        image_url_lst.append(image_url)
    return image_url_lst


# 生产者线程
class Producer(threading.Thread):
    """
       1、子类不重写__init__ , 实例化子类时,会自动调用父类定义的__init__
       2、子类重写了__init__时,实例化子类,就不会调用父类已经定义的__init__
    """

    def __init__(self, page_queue, image_url_queue):
        # super()方法在子类中调用父类__init__()方法
        super().__init__()
        self.page_queue = page_queue
        self.image_url_queue = image_url_queue

    # 重写run()方法
    def run(self):
        # 判断,当队列不为空时!(empty()判断队列是否为空)
        while not self.page_queue.empty():
            # 从队列1中取出url开始处理
            page_url = self.page_queue.get()
            # 取出后发送请求
            resp = requests.get(page_url, headers=headers)
            # 将请求来的页面数据转换为json类型(页面请求类型为Ajax请求)
            json_data = resp.json()
            d = {}
            # 获取json数据中的list项
            data_list = json_data['List']
            # 获取壁纸名字与url,将名字与url按字典键与值对应拼接起来
            for data in data_list:
                # 调用函数exact_url(data)处理好的壁纸url数据
                image_url_lst = exact_url(data)
                # 获取壁纸名字,将url数据解码(parse.unquote()方法)
                sProdName = parse.unquote(data['sProdName'])
                # 字典的强制增加,dict[键] = 值
                d[sProdName] = image_url_lst
            # 创建本地文件夹路径,将处理好的路径与url放入队列2
            for key in d:
                # 拼接路径(path.join()),创建目录文件夹: image/马可波罗-暗影游猎 image/李信—一念神魔
                # strip()方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
                dirpath = os.path.join('img/王者荣耀壁纸/', key.strip(' '))
                # 判断路径是否存在,如果不存在则创建路径(path.exists())
                if not os.path.exists(dirpath):
                    os.mkdir(dirpath)
                # 下载图片并保存,遍历字典中的每一个值,此时key索引为字典中每一个键(dict[键] = 值)
                # enumerate() 函数用于将一个可遍历的数据对象(如列表、元组或字符串)组合为一个索引序列,同时列出数据和数据下标
                for index, image_url in enumerate(d[key]):
                    # 将处理好的({图片路径,下载地址})打包,以字典的形式放入队列2中.
                    self.image_url_queue.put(
                        {'image_path': os.path.join(dirpath, f'{index + 1}.jpg'), 'image_url': image_url})

# 消费者线程
class Customer(threading.Thread):
    def __init__(self, image_url_queue):
        super().__init__()
        self.image_url_queue = image_url_queue

    def run(self):
        # 持续下载
        while True:
            # 当程序20秒无响应时报错,退出程序
            try:
                # 从队列2中取出数据,设置响应时间20S
                image_obj = self.image_url_queue.get(timeout=20)
                # 将URL表示的网络对象复制/下载到本地文件,request.urlretrieve(url,保存路径)。
                request.urlretrieve(image_obj['image_url'], image_obj['image_path'])
                print(f'{image_obj["image_path"]}下载完成')
            except:
                break

def start():
    # 创建队列1,用于存储每个页面url的队列(共22页)
    page_queue = Queue(22)
    # 创建队列2,用于存储图片路径的队列
    image_url_queue = Queue(1000)
    # 序列索引迭代22页url,批量放入队列1
    for i in range(0, 3):
        # 格式字符串语法,在字符串前加f,字符串中{}内输入替换内容,用途同str.format()方法
        page_url = f'https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page={i}&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1616725860371'
        # print(page_url)
        # 利用循环,将请求url批量放入队列1
        page_queue.put(page_url)

    # 创建生产者线程对象
    for i in range(5):
        # 调用类方法,实例化类
        th = Producer(page_queue, image_url_queue)
        # 调用父类start()方法
        th.start()

    # 创建消费者线程队列
    for i in range(10):
        # 取
        th = Customer(image_url_queue)
        th.start()
        
if __name__ == '__main__':
    start()

你可能感兴趣的