SpringBoot整合WebSocket+Redis实现实时数据推送到web界面

pom文件

//需要的依赖

		
			io.swagger
			swagger-annotations
			1.6.2
		

		
			org.springframework.boot
			spring-boot-starter-data-redis
		
		
			org.springframework.boot
			spring-boot-starter-websocket
		

		
		
			com.github.wvengen
			proguard-maven-plugin
			2.0.4
		


	

Application.yml文件配置连接参数

Spring:
  redis:
    host: 服务器的ip地址
    password: 
    port: 6379
    jedis:
      pool:
        max-active: 8


server:
  port: 8080

RedisConfig.javaspring连接redis服务器。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {

    @Bean("container")
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) connectionFactory;
        //设置存储的节点
        lettuceConnectionFactory.setDatabase(0);
        container.setConnectionFactory(lettuceConnectionFactory);
        //这里要设定监听的主题是chat
        container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisMessageListener receiver) {
        return new MessageListenerAdapter(receiver);
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

RedisMessageListener.java监听频道发布的消息并传给WebSocket

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import javax.websocket.Session;
import java.io.IOException;

/***
 * 定义一个RedisMessageListener类实现MessageListener接口,做消息订阅的处理
 */
@Component
public class RedisMessageListener implements MessageListener {

    //用户的session
    private Session session;

    //用户的ID
    private String userId;

    //在线人数
    private Integer onlineCount;

    public Integer getOnlineCount() {
        return onlineCount;
    }

    public void setOnlineCount(Integer onlineCount) {
        this.onlineCount = onlineCount;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String topic = new String(pattern);

        String msg = new String(message.getBody());
        if (null != session && session.isOpen()) {
            //用户在线而且订阅了主题
            try {
                synchronized (session) {
                    msg = "用户ID是:" + userId + "您好!  您正则与: " + onlineCount + "  人在线观看," +
                            "共同订阅的话题:《" + topic + "》发布了消息,内容是:《" + msg + "》";
                    System.out.println(msg);
                    session.getBasicRemote().sendText(msg);
                }
            } catch (IOException e) {
                System.out.println("发送消息异常");
            }
        } else if (userId != null) {
            //用户不在线但是订阅了主题
            System.out.println("用户:  " + userId + "  当前不在线,但是他已经订阅了,所以我们无法给他实时推出数据");
            doLiXian(userId);

        } else {

        }
    }

    public void doLiXian(String userId) {
        System.out.println(userId + "我们可以根据用户的ID来给用户发送一些消息,都在这个方法里完成,比如发邮件、发短信之类的");
    }
}

WebSocketConfig.java文件

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/***
 * 配置ServerEndpointExporter,配置后会自动注册所有“@ServerEndpoint”注解声明的Websocket Endpoint
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebsocketEndpoint.java文件用于与前端交互

import java.util.Date;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;


import com.example.demo.RedisMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.RestController;


/***
 * 用“@ServerEndPoint”注解来实现,实现简单;
 * 分别是用户ID 和用户订阅的主题
 */
@ServerEndpoint("/socket/{userId}/{topic}")
@RestController
public class WebsocketEndpoint {


    /***
     * 用来记录当前连接数的变量
     */
    private static volatile int onlineCount = 0;


    /***
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象
     */
    private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet();


    /**
     * 与某个客户端的连接会话,需要通过它来与客户端进行数据收发
     */
    private Session session;

    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketEndpoint.class);


    //用来引入刚才在webcoketConfig注入的类
    private RedisMessageListenerContainer container = SpringUtils.getBean("container");


    //自定义的消息发送器
    private RedisMessageListener listener;


    /***
     * socket打开的处理逻辑
     * @param session
     * @param userId
     * @param topic
     * @throws Exception
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("topic") String topic) throws Exception {
        LOGGER.info("打开了Socket链接Open a html. userId={}, name={}", userId, topic);
        this.session = session;
        //webSocketSet中存当前对象
        webSocketSet.add(this);
        //在线人数加一
        addOnlineCount();
        listener = new RedisMessageListener();
        //放入session
        listener.setSession(session);
        //放入用户ID
        listener.setUserId(userId);
        //放入在线人数
        listener.setOnlineCount(getOnlineCount());
        container.addMessageListener(listener, new PatternTopic(topic));
    }

    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        subOnlineCount();
        getOnlineCount();
        container.removeMessageListener(listener);
        LOGGER.info("关闭了Socket链接Close a html. ");
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        getOnlineCount();
        LOGGER.info("收到一条数据消息,Receive a message from client: " + message + session.getId());
        try {
            this.sendMessage(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        LOGGER.error("socket链接错误错误Error while html. ", error);
    }

    public void sendMessage(String message) throws Exception {
        if (this.session.isOpen()) {
            getOnlineCount();
            this.session.getBasicRemote().sendText("Send a message from server. " + message);
        }
    }

    public static synchronized int getOnlineCount() {
        System.out.println(new Date() + "在线人数为" + onlineCount);
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebsocketEndpoint.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebsocketEndpoint.onlineCount--;
    }
}

SpringUtils.javaspring工具类

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;

@Component
public class SpringUtils implements BeanFactoryPostProcessor {

    private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    public static ConfigurableListableBeanFactory getBeanFactory() {
        return beanFactory;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static  T getBean(String name) throws BeansException {
        return (T) getBeanFactory().getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static  T getBean(Class clz) throws BeansException {
        T result = (T) getBeanFactory().getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return getBeanFactory().containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class getType(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getAliases(name);
    }
}

web界面websocket.html




    WebSocket示例
    
    


当前用户ID是:23

当前订阅的话题是: chat


 

你可能感兴趣的