爬虫之多任务异步协程

gevent模块

示例代码:

特点: 可以识别所有阻塞,但是没有回调函数

from gevent import monkey
monkey.patch_all()
import gevent
import requests
from lxml import etree
import time

# 发送请求
def get_request(url):
    page_text = requests.get(url).text
    tree = etree.HTML(page_text)
    print(len(tree.xpath('//div[1]//text()')))
    return page_text

# 解析数据
def parse(page_text):
    tree = etree.HTML(page_text)
    print(tree.xpath('//div[1]//text()'))

start = time.time()
g1 = gevent.spawn(get_request, 'http://127.0.0.1:5000/index')
g2 = gevent.spawn(get_request, 'http://127.0.0.1:5000/hxbs')
gevent.joinall([g1, g2])
print(time.time() - start)

asyncio模块

安装: pip install asyncio

特点: 只能识别支持异步的模块的阻塞

协程对象

import asyncio
from time import sleep

# 特殊的函数: 如果一个函数的定义被async关键字修饰,则该函数是一个特殊函数
async def get_request(url):
    print('正在请求:', url)
    sleep(1)
    print('请求结束:', url)

# 特殊函数被调用后,函数内部的语句不会立即执行
# 特殊函数的调用返回一个协程对象
c = get_request('www.1.com')

# 结论1: 协程对象 == 特殊的函数

任务对象

任务对象其实就是对协程对象的进一步封装,并且可以给任务对象绑定回调

结论2: 任务对象 == 高级的协程对象 == 特殊的函数

# 定义回调函数,接收的参数是任务对象,任务对象执行结束后执行
def parse(task):
    ret = task.result()  # 取得任务对象(特殊函数)的返回值
# 创一个任务对象: 基于协程对象创建
# task就是一个任务对象
task = asyncio.ensure_future(c)
# 绑定回调函数
task.add_done_callback(parse)

事件循环对象

作用: 将其内部注册的任务对象进行异步执行

# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# 将任务对象注册到事件循环对象中并且开启事件循环
loop.run_until_complete(task)

注意: 事件循环对象中注册多个任务对象时,需要使用async.wait()对任务列表进行挂起操作

loop.run_until_complete(async.wait(task_list))

编码流程:

  • 定义特殊函数
  • 创建协程对象
  • 封装任务对象
  • 创建事件循环对象
  • 将任务对象注册到事件循环对象中并且开启事件循环

注意: 在特殊函数内部的实现语句中,不可以出现不支持异步的模块对应的代码,否则就会终止多任务异步协程的异步效果

注意: reuqests模块不支持异步,需要使用aiothttp模块进行爬取,此模块使用方法和requests高度相似,

示例代码:

import asyncio
import time

async def get_request(url):
    print('正在请求:', url)
    # time模块不支持异步
    # time.sleep(1)
    # 需要使用await关键字对阻塞操作进行等待
    await asyncio.sleep(1)
    print('请求结束:', url)

urls = [
    'www.1.com',
    'www.2.com',
    'www.3.com'
]
task_list = []  # 存放多个任务对象的列表
for url in urls:
    c = get_request(url)
    task = asyncio.ensure_future(c)
    task_list.append(task)
# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# 将任务对象注册到事件循环对象中并且开启事件循环
loop.run_until_complete(asyncio.wait(task_list))

aiohttp模块

安装: pip install aiohttp

特点: 支持异步的网络请求模块

编码流程:

  • 写基本架构:

    # aiohttp创建的对象都需要关闭,使用with
    with aiohttp.ClientSession() as cs:
        # 注意get/post: proxy = 'http://ip:port'
        # 其余参数与requests模块一致
        with cs.get(url) as response:
            # text()字符串形式的响应数据
            # read()二进制的响应数据
            page_text = response.text()
            return  page_text
  • 补充细节:

    添加async关键字: 每一个with前加上async

    添加await关键字: 在每一步的阻塞操作前加上await

    • 请求(get(), post()等)
    • 获取相应数据(text(), read())

asyncio + aiohttp 实现多任务协程爬虫:

简易服务器:

# 搭建简易服务器
from flask import Flask, render_template
from time import sleep
app = Flask(__name__)

@app.route('/hxbs')
def index_hxbs():
    sleep(2)
    return render_template('测试.html')

@app.route('/index')
def index_ceshi():
    sleep(2)
    return render_template('测试.html')

app.run(debug=True)

异步爬虫

# 异步爬虫
import asyncio
import aiohttp
import time
from lxml import etree

async def get_request(url):

    async with aiohttp.ClientSession() as cs:
        async with await cs.get(url) as response:
            page_text = await response.text()
            return page_text

def parse(task):
    page_text = task.result()
    tree = etree.HTML(page_text)
    data = tree.xpath('//div[2]//text()')[0]
    print(data)
urls = [
    'http://127.0.0.1:5000/hxbs',
    'http://127.0.0.1:5000/hxbs',
    'http://127.0.0.1:5000/index',
    'http://127.0.0.1:5000/index',
]
start = time.time()
task_list = []
for url in urls:
    c = get_request(url)  # 协程对象
    task = asyncio.ensure_future(c)  # 任务对象
    task.add_done_callback(parse)  # 绑定回调,用于数据解析
    task_list.append(task)

loop = asyncio.get_event_loop()  # 事件循环对象
loop.run_until_complete(asyncio.wait(task_list))  # 注册任务
print('总耗时:', time.time() - start)

selenium模块

概念: 基于浏览器自动化的模块,都用于自动化测试

特性: 不支持异步,效率较低

selenium和爬虫之间的关联:

  • 便捷的爬取到动态加载的数据
    • 可见即可得
  • 便捷的实现模拟登录

拓展: Appium基于手机的自动化的模块

基本使用:

  • 环境安装: pip install selenium

  • 下载浏览器的驱动程序

    # google浏览器驱动程序
    http://chromedriver.storage.googleapis.com/index.html
    # 驱动程序与浏览器版本映射关系
    https://blog.csdn.net/huilan_same/article/details/51896672

演示代码:

from selenium import webdriver
from time import sleep

# 后面是你的浏览器驱动位置,记得前面加r'','r'是防止字符转义的
driver = webdriver.Chrome(r'chromedriver.exe')
# 用get打开百度页面
driver.get("http://www.baidu.com")
# 查找页面的“设置”选项,并进行点击
driver.find_elements_by_link_text('设置')[0].click()
sleep(2)
# # 打开设置后找到“搜索设置”选项,设置为每页显示50条
driver.find_elements_by_link_text('搜索设置')[0].click()
sleep(2)

# 选中每页显示50条
m = driver.find_element_by_id('nr')
sleep(2)
m.find_element_by_xpath('//*[@id="nr"]/option[3]').click()
m.find_element_by_xpath('.//option[3]').click()
sleep(2)

# 点击保存设置
driver.find_elements_by_class_name("prefpanelgo")[0].click()
sleep(2)

# 处理弹出的警告页面   确定accept() 和 取消dismiss()
driver.switch_to.alert.accept()
sleep(2)
# 找到百度的输入框,并输入 美女
driver.find_element_by_id('kw').send_keys('美女')
sleep(2)
# 点击搜索按钮
driver.find_element_by_id('su').click()
sleep(2)

# 关闭浏览器
driver.quit()

基本使用

from selenium import webdriver
import time
# 实例化某一款浏览器对象
browser = webdriver.Chrome(executable_path=r'chromedriver.exe')
# 基于浏览器发起请求
browser.get('https://www.jd.com')

# 商品搜索
# 标签定位
search_input = browser.find_element_by_id('key')
# 向定位到的标签中录入数据
search_input.send_keys('python')
# 点击搜索按钮
btn = browser.find_element_by_xpath('//*[@id="search"]/div/div[2]/button/i')
btn.click()

# 滚轮滑动(JS注入)
browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(2)

# 关闭浏览器
browser.quit()

爬取动态加载的数据

from selenium import webdriver
from lxml import etree
import time
# 实例化某一款浏览器对象
browser = webdriver.Chrome(executable_path=r'chromedriver.exe')
# 基于浏览器发起请求
browser.get('https://www.fjggfw.gov.cn/Website/JYXXNew.aspx')
# page_source: 当前页面所有的页面源码数据
page_text = browser.page_source
# 存储前三页对应的页面源码数据
all_page_text = [page_text]

for i in range(2, 4):
    next_page_btn = browser.find_element_by_xpath(f'//body//a[@onclick="return kkpager._clickHandler({i})"]')
    next_page_btn.click()
    time.sleep(1)
    page_text = browser.page_source
    all_page_text.append(page_text)

for page_text in all_page_text:
    tree = etree.HTML(page_text)
    title = tree.xpath('//*[@id="list"]/div[1]/div/h4/a/text()')[0]
    print(title)

动作链

注意:

在使用find系列的函数进行标签定位时,如果出现NoSuchElementException错误怎么处理?

  • 如果定位的标签是存在于iframe标签之下(子页面)中,则在进行指定标签定位的时,必须使用switch_to.frame(frame_id)的操作才可.
from selenium import webdriver
from selenium.webdriver import ActionChains
import time

# 实例化某一款浏览器对象
browser = webdriver.Chrome(executable_path=r'chromedriver.exe')
# 基于浏览器发起请求
browser.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')

# NoSuchElementException: 定位标签时出现此错误,考虑是否为子页面内的标签
browser.switch_to.frame('iframeResult')  # 参数是iframe标签的id属性值
div_tag = browser.find_element_by_id('draggable')

# 基于动作链实现滑动操作
# 实例化一个动作链对象: 指定浏览器
action = ActionChains(browser)
# 点击且长按
action.click_and_hold(div_tag)
for i in range(4):
    # perform()表示让动作链立即执行
    action.move_by_offset(25, 0).perform()  # (x, y)
    time.sleep(0.5)

time.sleep(2)
browser.quit()

无头浏览器

没有可视化界面的浏览器

  • phantomjs
  • 谷歌无头浏览器(推荐使用)
from selenium import webdriver

from selenium.webdriver.chrome.options import Options
# 创建一个参数对象,用来控制chrome以无界面模式打开
options = Options()
options.add_argument('--headless')
options.add_argument('--disable-gpu')

# 设置浏览器
browser = webdriver.Chrome(executable_path='chromedriver.exe', options=options)
browser.get('https://www.taobao.com/')
print(browser.page_source)

如何规避selenium被监测到的风险?

  • 网站可以根据:window.navigator.webdriver的返回值鉴定是否使用了selenium
    • undefind:正常
    • true:使用了selenium
from selenium import webdriver
from time import sleep
from selenium.webdriver import ChromeOptions

option = ChromeOptions()
option.add_experimental_option('excludeSwitches', ['enable-automation'])

# 后面是浏览器驱动位置,记得前面加r'','r'是防止字符转义的
browser = webdriver.Chrome(r'chromedriver.exe',options=option)

browser.get('https://www.taobao.com/')

模拟登录12306

from ChaoJiYing import Chaojiying_Client
from selenium import webdriver
from selenium.webdriver import ActionChains
from PIL import Image  # 图片操作模块
import time

def get_code(imgPath, imgType):
    chaojiying = Chaojiying_Client('账号', '账号', '901814')
    im = open(imgPath, 'rb').read()
    return chaojiying.PostPic(im, imgType)['pic_str']

browser = webdriver.Chrome(executable_path='chromedriver.exe')
browser.get('https://kyfw.12306.cn/otn/login/init')
time.sleep(2)

# 输入用户名和密码
username_input = browser.find_element_by_id('username')
username_input.send_keys('账号')
password_input = browser.find_element_by_id('password')
password_input.send_keys('账号')
# 截图
browser.save_screenshot('main.png')
# 在main.png中截取下验证码图片
img_tag = browser.find_element_by_xpath('//*[@id="loginForm"]/div/ul[2]/li[4]/div/div/div[3]/img')
# 标签的大小{'height': 190, 'width': 293}
size = img_tag.size
# 标签左下角的坐标{'x': 276, 'y': 274}
location = img_tag.location
# 裁剪范围
range_xy = (
    int(location['x']), int(location['y']), int(location['x'] + size['width']), int(location['y'] + size['height']))
# 必须是png格式
main_png = Image.open(r'main.png')
code_png = main_png.crop(range_xy)
code_png.save('code.png')
# 105,167|105,267|125,167
code = get_code('./code.png', 9004)
# [[105, 167], [105, 267], [125, 167]]
all_list = []
if '|' in code:
    list_1 = code.split('|')
    count_1 = len(list_1)
    for i in range(count_1):
        xy_list = []
        x = int(list_1[i].split(',')[0])
        y = int(list_1[i].split(',')[1])
        xy_list.append(x)
        xy_list.append(y)
        all_list.append(xy_list)
else:
    x = int(code.split(',')[0])
    y = int(code.split(',')[1])
    xy_list = []
    xy_list.append(x)
    xy_list.append(y)
    all_list.append(xy_list)

for x, y in all_list:
    # move_to_element_with_offset以标签的左下角为起点进行偏移
    ActionChains(browser).move_to_element_with_offset(img_tag, x, y).click().perform()
    time.sleep(1)

# 点击登录
loginSub = browser.find_element_by_id('loginSub')
loginSub.click()

你可能感兴趣的