Java EE——阻塞队列

队列

我们之前在数据结构讲过,队列是一种先进先出的存储数据的结构。然而事实上并不是所有的队列都是先进先出的,比如优先级队列,消息队列

消息队列就是可以指定出队列的数据的类型,例如我们在医院排队并不是所有人按先后顺序看病的,而是要看指定的科室的医生有没有把上一个病人看完。等到看完了以后,再指定从队列中取出属于自己这个科室的队列中的第一个人

中间件

像消息队列这种我们在工作中各处都有可能用到的结构,我们就会把他单独实现成一个程序,然后部署到服务器上,这样的话需要用的的时候就可以直接使用,这样的程序称之为“中间件”
比如我们之前讲的MySQL就是一个中间件

阻塞队列

阻塞队列是一种先进先出的数据结构,和普通队列不同的是,当队列是空的,我们这时候取队列元素的时候,就会阻塞等待,直到队列不为空。同样的,当队列是满的时候,我们入队列,也会阻塞等待直到队列有位置了

并且我们的阻塞队列是线程安全的

功能

生产者消费者模型

指的是多线程协同工作的方式。
例如我们一个线程是生产辣条,一个线程是吃辣条
当我们的生产的辣条到一定数量的时候,生产者就会停止生产,防止过剩。
当我们的消费者把辣条都吃完的时候,消费者就会停止吃,等待生产者生产。

解耦合

阻塞队列可以让代码解耦合

例如我们的两个服务器之间需要通信,a服务器如果直接给b服务器发送消息,两个服务器中就都有关于对方的代码

如果我们在a服务器和b服务器之间加上阻塞队列,这样的话,我们的a服务器和b服务器的耦合度就降低了。

耦合度低了就可以防止我们如果加入了另一个服务器c,我们就要大幅度调整a服务器的代码,如果有阻塞队列的话,就可以直接让c从阻塞队列中读取数据了

削峰填谷

例如我们的两个服务器之间需要通信,当a服务器给b服务器发送的请求过多时,b服务器就有可能瘫痪
如果我们在a服务器和b服务器之间加上阻塞队列,当a服务器传入的消息到达一定数量时,就不能再传入了。而当阻塞队列是空的,b服务器就不读取消息了

服务器瘫痪的原因:
服务器在处理请求时会消耗很多资源,例如内存,cpu,硬盘,带宽
如果请求过多(例如双十一),那么资源消耗的越多,资源消耗没了,服务器就瘫痪了

java标准库下的阻塞队列

我们java标准库的阻塞队列的类是BlockingQueue
其有多种实现,例如数组实现,链表实现,优先级队列实现…

demo

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(1000);
        blockingQueue.put(1);
        blockingQueue.put(2);
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
    }
}

取出元素是take方法
存入元素是put方法
当我们取出元素次数比存入还多,那么这个代码就会一直卡在这个take上

MyBlockingQueue

我们可以自己实现一个基于数组实现的阻塞队列

public class MyBlockingQueue {
    private int [] elem = new int[1000];
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0;

    public void put(int value) throws InterruptedException {
        synchronized (this){
            while (size == elem.length){
                this.wait();
            }
            elem[tail] = value;
            tail++;
            if(tail >= size){
                tail = 0;
            }
            size++;
            this.notify();
        }
    }

    public int take() throws InterruptedException {
        synchronized (this){
            while (size == 0){
                this.wait();
            }
            int ret = elem[head];
            head++;
            if(head >= size){
                head = 0;
            }
            size--;
            this.notify();
            return ret;
        }
    }

    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
        Thread producer = new Thread(() -> {
            int n = 1;
            while(true){
                try {
                    myBlockingQueue.put(1);
                    n++;
                    System.out.println("生产元素" + n);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread customer = new Thread(() -> {
            int n = 1;
            while(true){
                try {
                    myBlockingQueue.take();
                    n++;
                    System.out.println("消费元素" + n);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        customer.start();
    }
}

volatile写法

我们的volatile将队列中的元素进行修饰,防止了内存可见性问题和指令重排序问题

wait和notify写法

我们的take和put中都各有一个wait和一个notify
事实上,他们的wait都是让自己阻塞等待,而notify是使对方唤醒
也就是说,当a线程调用put方法,如果队列满了,那么就会触发wait,而只有在b线程调用了take方法,执行到notify时,才会把a线程唤醒,继续执行下面的操作

同样的,当a线程调用take方法,如果队列满了,那么就会触发wait,而只有在b线程调用了put方法,执行到notify时,才会把a线程唤醒,继续执行下面的操作

为什么是while

而之所以我们的wait条件判定是while而不是if,这是因为我们希望线程在唤醒时再判定一下队列是否真的是不满了/不空了

因为我们的wait不一定是被对方的notify唤醒的,也有可能是interrupt唤醒的

还有一种可能,当a线程调用take方法,发现队列空的,于是释放锁,阻塞等待,然后b线程调用put方法,给队列中放了一个数据,然后释放锁。这时还有一个线程c,也想调用take方法,由于我们的锁释放后并不是先来后到,因此可能线程c抢到了这把锁,取走了队列中的元素,这时线程a才从wait中唤醒,重新获得锁,但是又没有元素可以取走了

你可能感兴趣的