Redis 发布订阅模式介绍,Java 使用 Redis 实现广播

介绍:

Redis发布订阅(Publish / Subscribe)模式是Redis提供的一种消息通信方式,它基于消息中间件的设计思想,实现了消息的生产者和消费者的解耦。Redis 发布订阅模式的工作原理类似于消息队列,不同之处在于 Redis 发布订阅模式是一种多对多的消息通信方式,一个消息可以被多个消费者接收。

在Redis发布订阅模式中,消息的生产者将消息发布到指定的频道,而消费者则通过订阅对应的频道来接收消息。当消息生产者发布了一条消息到某个频道时,所有订阅了该频道的消费者都可以接收到这条消息。

Redis发布订阅模式包括两个核心的命令:PUBLISH 和 SUBSCRIBE。PUBLISH 命令用于向指定的频道发布一条消息,而 SUBSCRIBE 命令则用于订阅指定的频道。

原理:

  1. Redis 服务器维护了一个”订阅者列表”,该列表记录了所有订阅了至少一个频道的客户端。
  2. 当一个客户端订阅一个或多个频道时,Redis服务器会将该客户端添加到对应频道的”订阅者列表”中。
  3. 当一个客户端发布消息到某个频道时,Redis服务器会将该消息发送给该频道的所有订阅者。
  4. 当一个客户端取消订阅一个频道时,Redis服务器会将该客户端从该频道的”订阅者列表”中移除。

在 Redis 发布订阅模式中,Redis 服务器起到了一个消息中心的作用,它负责维护订阅者列表和消息发布、转发的机制。同时,Redis提供了一系列API,使得客户端可以方便地进行订阅和发布操作。

在 SpringBoot 中使用

创建监听器

@Component
public class RedisMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String s = new String(message.getBody());
        System.out.println("收到消息:" + s);
    }

}

配置监听器

@Configuration
public class RedisMessageConfig {

    /**
     * 消息监听器适配器
     * @return org.springframework.data.redis.listener.adapter.MessageListenerAdapter
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisMessageListener receiver) {
        //这个地方是给 messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“onMessage”
        return new MessageListenerAdapter(receiver, "onMessage");
    }
    
    /**
     * redis消息监听器容器
     * @return org.springframework.data.redis.listener.RedisMessageListenerContainer
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅了一个叫 my-channel 的频道
        container.addMessageListener(listenerAdapter, new PatternTopic("my-channel"));
        // 这个container 可以添加多个 messageListener
        return container;
    }
  
}

测试方法

    @GetMapping(value = "/testRedis")
    public void testRedis() {
        redisTemplate.convertAndSend("my-channel", JsonUtil.toJsonStr(new Student(1L, "张三")));
    }

效果

在这里插入图片描述

Java 如何获取请求原始JSON字符串

获取请求中原始JSON字符串一般有两种方式

第一种:@RequestBody

@PostMapping("/myEndpoint")
public void myMethod(@RequestBody String body) {
    System.out.println(body);
}

第二种:HttpServletRequest

@PostMapping("/myEndpoint")
public void myMethod(HttpServletRequest request) {
    String body = "";
    try (BufferedReader reader = request.getReader()) {
        StringBuilder stringBuilder = new StringBuilder();
        String line;
        while ((line = reader.readLine()) != null) {
            stringBuilder.append(line);
        }
        body = stringBuilder.toString();
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println(body);
}
Java实现通用本地延迟队列

这里使用了delayed 来实现延迟队列。

1. 定义函数接口

@FunctionalInterface
public interface EventFunction {

    void apply();
}

这个是用于执行函数,Java自带的有 Consumer 和 Function 等其他,因为我这边不需要入参和结果,所以我自己重新定义了一个。


2. 定义实体类

public class DelayedEntity implements Delayed {

    private final String id;
    private final long expireTime;
    private final EventFunction function;

    public DelayedEntity(long expireTime, EventFunction function) {
        super();
        this.expireTime = expireTime;
        this.function = function;
        // HuTool工具自动生成id,用于排查问题
        this.id = IdUtil.fastSimpleUUID();
    }

    public void execute() {
        function.apply();
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return expireTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    public String getId() {
        return id;
    }
}

3. 定义消费者

@Slf4j
@Component
public class DelayEventConsumer implements Runnable {

    private final DelayQueue<DelayedEntity> queue = new DelayQueue<>();

    public DelayEventConsumer() {
    }

    public void submit(long expireTime, EventFunction function) {
        queue.put(new DelayedEntity(expireTime, function));
    }


    @Override
    public void run() {
        DelayedEntity entity = null;
        while (true) {
            try {
                entity = queue.poll();
                if (entity == null) {
                    continue;
                }
                log.info("延迟队列 [{}] 执行消息", entity.getId());
                entity.execute();
                log.info("延迟队列 [{}] 执行完毕", entity.getId());
            } catch (Exception e) {
                log.error(String.format("延迟队列 [%s] 执行报错", entity != null ? entity.getId() : "null"), e);
            }
        }
    }
}

4. 定义线程池

定义一个只有一个线程的线程池,在程序启动的时候执行。

@Component
public class ApplicationStartListener implements ApplicationListener<ApplicationStartedEvent> {

    @Autowired
    private DelayEventConsumer delayEventConsumer;

    @Override
    public void onApplicationEvent(@NotNull ApplicationStartedEvent event) {
        // HuTool工具生成只有一个线程的线程池,将消费者放进去执行
        ThreadUtil.newSingleExecutor().submit(delayEventConsumer);
    }

}

5. 测试

写了个接口用来测试

@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {
  private final DelayEventConsumer delayEventConsumer;
  private final UserTagService userTagService;
  
  @GetMapping("/testDelayQueue")
    public void testDelayQueue() {
        log.info("提交事件");
        // 延迟2秒执行
        delayEventConsumer.submit(System.currentTimeMillis() + 2000, () -> {
            List<UserTags> userTags = userTagService.randomRobot();
            log.info(JsonUtil.toJsonStr(userTags));
        });
    }
}

结果

2023-03-08 17:27:49.631  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : 提交事件
2023-03-08 17:27:51.755  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : 延迟队列 [c195406b95a048f7b27825f6839dbe98] 执行消息
2023-03-08 17:27:52.475  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : [{"country":"IN","createTime":1677764575237,"id":4976,"level":20,"nickname":"👑 ptB_Colin","sid":104485,"type":"robot","userId":1368812591190769664}]
2023-03-08 17:27:52.476  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : 延迟队列 [c195406b95a048f7b27825f6839dbe98] 执行完毕
Mybatis XML转义字符
<![CDATA[ < ]]>
转义字符 符号 名称
&lt; < 小于
&gt; > 大于
&lt;= <= 小于等于
&gt;= >= 大于等于
&amp; &
&apos; 单引号
&quot; 双引号
SpringBoot + Jfinal Enjoy + JdbcTemplate 整合

最近发现了 Jfinal 的 ORM 非常好用,于是想着将其抽出来,因为公司目前基于 JdbcTemplate 简单做了封装,将 SQL 语句写在 SQL 文件中,然后读取出来用 JdbcTemplate 执行,跟 Jfinal 是类似的,所以想将其抽出来扩展公司当前的框架

下载链接:https://pan.quark.cn/s/bac2b41ffc60

核心代码讲解

项目所用到的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>com.jfinal</groupId>
    <artifactId>enjoy</artifactId>
    <version>5.0.0</version>
</dependency>

自建2个注解,用于表示实体类字段名

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface DbEntity {
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.ANNOTATION_TYPE})
public @interface DbColumn {

    String value() default "";

    boolean exist() default true;
}

DbMapper:用于保存 java 属性与数据库字段的映射关系

public class DbMapper {

    private static final Map<Class<?>, Map<String, String>> classColMap =  new HashMap<>();

    private static final Map<Class<?>, Map<String, Method>> classSetterMap = new HashMap<>();

    private static final Map<Class<?>, Map<String, Class<?>>> classFieldTypeMap = new HashMap<>();

    public static Map<String, String> columnMap(Class<?> clazz) {
        return classColMap.get(clazz);
    }

    public static String toSetterName(String fieldName) {
        String firstLetter = fieldName.substring(0, 1);
        return "set" + fieldName.replaceFirst(firstLetter, firstLetter.toUpperCase());
    }

    public static Method setter(Class<?> entityClass, String field) {
        return classSetterMap.get(entityClass).get(field);
    }

    public static void classColMap(Class<?> clazz, Map<String, String> colMap) {
        classColMap.put(clazz, colMap);
    }

    public static void classSetterMap(Class<?> clazz, Map<String, Method> setterMap) {
        classSetterMap.put(clazz, setterMap);
    }

    public static void classFieldTypeMap(Class<?> clazz, Map<String, Class<?>> fieldTypeMap) {
        classFieldTypeMap.put(clazz, fieldTypeMap);
    }

    public static Class<?> getClassFieldType(Class<?> clazz, String fieldName) {
        return classFieldTypeMap.get(clazz).get(fieldName);
    }

读取实体类并将其属性与字段的映射保存

        /**
     * 扫描包下全部类
     *
     * @author vital
     */
    public void scanner() {
        String basePackage = "net.vitalblog.edb.entity";
        ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
        try {
            String pattern = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + ClassUtils.convertClassNameToResourcePath(basePackage) + "/**/*.class";
            org.springframework.core.io.Resource[] resources = resourcePatternResolver.getResources(pattern);
            MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourcePatternResolver);
            for (org.springframework.core.io.Resource resource : resources) {
                // 用于读取类信息
                MetadataReader reader = readerFactory.getMetadataReader(resource);
                // 扫描到的class
                String classname = reader.getClassMetadata().getClassName();
                Class<?> clazz = Class.forName(classname);
                // 判断是否有指定注解
                if (clazz.getAnnotation(DbEntity.class) != null) {
                    register(clazz);
                }
            }
        } catch (IOException | ClassNotFoundException e) {
            log.error("error", e);
        }
    }

    /**
     * 将类信息保存
     *
     * @param clazz 实体类
     * @author vital
     */
    private <T> void register(Class<T> clazz) {
        List<Field> fieldList = new ArrayList<>(Arrays.asList(clazz.getDeclaredFields()));
        Class<? super T> superclass = clazz;
        // 判断是否有父类
        for (int i = 0; i < 5; i++) {
            superclass = superclass.getSuperclass();
            if (superclass == null) {
                break;
            }
            fieldList.addAll(Arrays.asList(superclass.getDeclaredFields()));
        }
        Map<String, Class<?>> fieldTypeMap = new HashMap<>(fieldList.size() + 1);
        Map<String, String> colMap = new HashMap<>(fieldList.size() + 1);
        Map<String, Method> setterMap = new HashMap<>(fieldList.size() + 1);

        for (Field field : fieldList) {
            String fieldName = field.getName();
            DbColumn column = field.getAnnotation(DbColumn.class);
            if (column != null && !column.exist()) {
                continue;
            }
            String colName = column != null ? column.value() : camelCase2UnderScoreCase(fieldName);
            fieldTypeMap.put(fieldName, field.getType());
            colMap.put(colName, fieldName);
            try {
                Method setter = clazz.getMethod(DbMapper.toSetterName(fieldName), field.getType());
                setterMap.put(fieldName, setter);
            } catch (Exception e) {
                log.error("error", e);
            }
        }

        DbMapper.classFieldTypeMap(clazz, fieldTypeMap);
        DbMapper.classColMap(clazz, colMap);
        DbMapper.classSetterMap(clazz, setterMap);
    }

    /**
     * 驼峰命名转下划线命名
     *
     * @param str 驼峰字段名
     * @return java.lang.String
     * @author vital
     */
    public static String camelCase2UnderScoreCase(String str) {
        Pattern compile = Pattern.compile("[A-Z]");
        Matcher matcher = compile.matcher(str);
        StringBuffer sb = new StringBuffer();
        while(matcher.find()) {
            matcher.appendReplacement(sb,  "_" + matcher.group(0).toLowerCase());
        }
        matcher.appendTail(sb);
        return sb.toString();
    }

RowMapper映射

public class CommonRowMapper<T> implements RowMapper<T> {

    private final Class<T> entityClass;

    public CommonRowMapper(Class<T> entityClass) {
        this.entityClass = entityClass;
    }

    @Override
    public T mapRow(ResultSet rs, int rowNum) throws SQLException {
        ResultSetMetaData metaData = rs.getMetaData();
        int colAmount = metaData.getColumnCount();
        Map<String, String> columnMap = DbMapper.columnMap(this.entityClass);
        T result = null;
        try {
            result = this.entityClass.newInstance();
            for (int index = 1; index <= colAmount; index++) {
                String column = JdbcUtils.lookupColumnName(metaData, index);
                String fieldName = columnMap.get(column);
                if (StringUtils.isBlank(fieldName)) {
                    continue;
                }
                Class<?> fieldType = DbMapper.getClassFieldType(this.entityClass, fieldName);
                Object value = JdbcUtils.getResultSetValue(rs, index, fieldType);
                if (value == null) {
                    continue;
                }
                Method setter = DbMapper.setter(this.entityClass, fieldName);
                setter.invoke(result, value);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return result;
    }

}

使用

public <T> T findOne(String sql, Class<T> entityClass, Object... paras) {
        return jdbcTemplate.queryForObject(sql, new CommonRowMapper<>(entityClass), paras);
}

测试效果:

手写HashMap

首先定义接口:

public interface VMap<K, V> {

    V put(K key, V value);

    V get(K key);

    Integer size();

    interface Entry<K, V> {

        K getKey();

        V getValue();
    }
}

实现接口:

import java.util.Arrays;

public class VHashMap<K, V> implements VMap<K, V> {

    //大小
    private Integer size = 0;
    //默认大小
    private Integer defaultLength = 16;
    //负载因子
    private double loadFactor = 0.75D;
    //存储数据的数组
    private Entry<K, V>[] table;

    VHashMap(Integer _defaultLength, double _loadFactor) {
        this.defaultLength = _defaultLength;
        this.loadFactor = _loadFactor;
        this.table = new Entry[defaultLength];
    }

    VHashMap() {
        this.table = new Entry[defaultLength];
    }

    Integer getIndex(K key) {
        return Math.abs((key.hashCode() % (defaultLength - 1)));
    }

    @Override
    public V put(K key, V value) {
        //获取下标
        Integer index = getIndex(key);
        //扩容
        if (table.length <= index + 1) {
            Integer len = table.length + (int) (table.length * loadFactor);
            Entry<K, V>[] newTable = Arrays.copyOf(table, len);
            this.table = newTable;
        }
        //判断下标是否被占用
        Entry<K, V> kvEntry = table[index];
        //没有被占用
        if (kvEntry == null) {
            table[index] = new Entry(key, value, null, index);
            size++;
        } else {
            //判断是否相同的key
            if (kvEntry.key.equals(key)) {
                //覆盖
                table[index] = new Entry(key, value, kvEntry.next, index);
            } else {
                //把新的值放进去
                table[index] = new Entry(key, value, kvEntry, index);
                size++;
            }
        }

        return table[index].getValue();
    }

    @Override
    public V get(K key) {
        Integer index = getIndex(key);
        Entry<K, V> kvEntry = table[index];
        do {
            if (kvEntry.key.equals(key)) {
                return table[index].getValue();
            }
            Entry<K, V> next = kvEntry.next;
            if (next == null) {
                return null;
            }
            kvEntry = kvEntry.next;
        } while (!key.equals(kvEntry.key));

        return kvEntry.getValue();
    }

    @Override
    public Integer size() {
        return size;
    }

    class Entry<K, V> implements VMap.Entry<K, V> {

        private Entry<K, V> next;
        private Integer index;
        private K key;
        private V value;

        Entry(K _key, V _value, Entry<K, V> _next, Integer _index) {
            this.key = _key;
            this.value = _value;
            this.next = _next;
            this.index = _index;
        }

        Entry() {
        }

        @Override
        public K getKey() {
            return this.key;
        }

        @Override
        public V getValue() {
            return this.value;
        }
    }
}